diff --git a/examples/spring-streams-example/build.gradle.kts b/examples/spring-streams-example/build.gradle.kts index eb8b8dc83..36740329d 100644 --- a/examples/spring-streams-example/build.gradle.kts +++ b/examples/spring-streams-example/build.gradle.kts @@ -19,7 +19,7 @@ dependencies { implementation(libs.kafka.streams) implementation(libs.kotlin.reflect) implementation(libs.google.protobuf.kotlin) - implementation(libs.kafka.streams.registry) + implementation(libs.kafka.streams.protobuf.serde) } dependencies { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index dc378953e..f4c15b7ae 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -49,6 +49,7 @@ mockito = "5.4.0" quiver = "0.5.12" akkurate = "0.10.0" exposed = "0.56.0" +kotlinx-serialization = "1.7.3" [libraries] kotlin-stdlib-jdk8 = { module = "org.jetbrains.kotlin:kotlin-stdlib-jdk8", version.ref = "kotlin" } @@ -61,7 +62,8 @@ kotlinx-slf4j = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-slf4j", ver kotlinx-knit = { module = "org.jetbrains.kotlinx:kotlinx-knit", version.ref = "knit" } kotlinx-io-reactor = { module = "io.projectreactor:reactor-core", version.ref = "io-reactor" } kotlinx-io-reactor-extensions = { module = "io.projectreactor.kotlin:reactor-kotlin-extensions", version.ref = "io-reactor-extensions" } -kotlinx-serialization = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json-jvm", version = "1.7.3" } +kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json-jvm", version.ref = "kotlinx-serialization" } +kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinx-serialization" } # Arrow arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" } @@ -86,7 +88,7 @@ akkurate-ksp-plugin = { module = "dev.nesk.akkurate:akkurate-ksp-plugin", versio kafka = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" } kafkaKotlin = { module = "io.github.nomisrev:kotlin-kafka", version.ref = "kafka-kotlin" } kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafka" } -kafka-streams-registry = { module = "io.confluent:kafka-streams-protobuf-serde", version.ref = "kafka-streams-registry" } +kafka-streams-protobuf-serde = { module = "io.confluent:kafka-streams-protobuf-serde", version.ref = "kafka-streams-registry" } # Couchbase couchbase-kotlin = { module = "com.couchbase.client:kotlin-client", version.ref = "couchbase-kotlin" } diff --git a/lib/stove-testing-e2e/build.gradle.kts b/lib/stove-testing-e2e/build.gradle.kts index 479e391f2..d523620f5 100644 --- a/lib/stove-testing-e2e/build.gradle.kts +++ b/lib/stove-testing-e2e/build.gradle.kts @@ -8,7 +8,7 @@ dependencies { api(libs.jackson.kotlin) api(libs.jackson.arrow) api(libs.google.gson) - api(libs.kotlinx.serialization) + api(libs.kotlinx.serialization.json) api(libs.testcontainers) { version { require(libs.testcontainers.asProvider().get().version!!) diff --git a/starters/spring/stove-spring-testing-e2e-kafka/build.gradle.kts b/starters/spring/stove-spring-testing-e2e-kafka/build.gradle.kts index c37d5253c..041ba0e9b 100644 --- a/starters/spring/stove-spring-testing-e2e-kafka/build.gradle.kts +++ b/starters/spring/stove-spring-testing-e2e-kafka/build.gradle.kts @@ -1,3 +1,9 @@ +import com.google.protobuf.gradle.id + +plugins { + alias(libs.plugins.protobuf) +} + dependencies { api(projects.lib.stoveTestingE2e) api(libs.testcontainers.kafka) @@ -11,4 +17,22 @@ dependencies { testImplementation(libs.spring.boot.autoconfigure) testImplementation(projects.starters.spring.stoveSpringTestingE2e) testImplementation(libs.logback.classic) + testImplementation(libs.google.protobuf.kotlin) + testImplementation(libs.kafka.streams.protobuf.serde) +} + +protobuf { + protoc { + artifact = libs.protoc.get().toString() + } + + generateProtoTasks { + all().forEach { + it.descriptorSetOptions.includeSourceInfo = true + it.descriptorSetOptions.includeImports = true + it.builtins { id("kotlin") } + } + } } + + diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/kafka/Options.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/kafka/Options.kt index 1ced67041..c977f8d23 100644 --- a/starters/spring/stove-spring-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/kafka/Options.kt +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/kafka/Options.kt @@ -47,17 +47,14 @@ data class FallbackTemplateSerde( @StoveDsl data class KafkaSystemOptions( - /** * The registry of the Kafka image. The default value is `DEFAULT_REGISTRY`. */ val registry: String = DEFAULT_REGISTRY, - /** * The ports of the Kafka container. The default value is `DEFAULT_KAFKA_PORTS`. */ val ports: List = DEFAULT_KAFKA_PORTS, - /** * The serde that is used while publishing the messages. The default value is [StoveSerde.jackson]. * You can also pass ser/de during the publish operation. @@ -68,26 +65,22 @@ data class KafkaSystemOptions( * ``` */ val serdeForPublish: StoveSerde = StoveSerde.jackson.anyJsonStringSerde(), - /** * The fallback serde for Kafka. It is used to serialize and deserialize the messages before sending them to Kafka. * If no [KafkaTemplate] is provided, it will be used to create a new [KafkaTemplate]. * Most of the time you won't need this. */ val fallbackSerde: FallbackTemplateSerde = FallbackTemplateSerde(), - /** * Container options for Kafka. */ val containerOptions: KafkaContainerOptions = KafkaContainerOptions(), - /** * Operations for Kafka. It is used to customize the operations of Kafka. * The reason why this exists is to provide a way to interact with lower versions of Spring-Kafka dependencies. * @see KafkaOps */ val ops: KafkaOps = KafkaOps(), - /** * The configuration of the Kafka settings that is exposed to the Application Under Test(AUT). */ diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/StringSerdeKafkaSystemTest.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/StringSerdeKafkaSystemTest.kt deleted file mode 100644 index 0d8ec253e..000000000 --- a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/StringSerdeKafkaSystemTest.kt +++ /dev/null @@ -1,234 +0,0 @@ -package com.trendyol.stove.testing.e2e.kafka - -import arrow.core.some -import com.trendyol.stove.testing.e2e.serialization.StoveSerde -import com.trendyol.stove.testing.e2e.springBoot -import com.trendyol.stove.testing.e2e.system.TestSystem -import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate -import io.kotest.core.spec.style.ShouldSpec -import io.kotest.matchers.shouldBe -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.Serdes -import org.slf4j.Logger -import org.springframework.boot.* -import org.springframework.boot.autoconfigure.SpringBootApplication -import org.springframework.boot.context.properties.* -import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.annotation.Bean -import org.springframework.context.support.beans -import org.springframework.kafka.annotation.* -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory -import org.springframework.kafka.core.* -import org.springframework.kafka.listener.* -import org.springframework.util.backoff.FixedBackOff -import kotlin.random.Random - -@SpringBootApplication -@EnableKafka -@EnableConfigurationProperties(KafkaTestSpringBotApplicationForStringSerde.KafkaTestSpringBotApplicationConfiguration::class) -open class KafkaTestSpringBotApplicationForStringSerde { - companion object { - fun run( - args: Array, - init: SpringApplication.() -> Unit = {} - ): ConfigurableApplicationContext { - System.setProperty("org.springframework.boot.logging.LoggingSystem", "none") - return runApplication(args = args) { - webApplicationType = WebApplicationType.NONE - init() - } - } - } - - private val logger: Logger = org.slf4j.LoggerFactory.getLogger(javaClass) - - @ConfigurationProperties(prefix = "kafka") - @ConstructorBinding - data class KafkaTestSpringBotApplicationConfiguration( - val bootstrapServers: String, - val groupId: String, - val offset: String - ) - - @Bean - open fun kafkaListenerContainerFactory( - consumerFactory: ConsumerFactory, - interceptor: RecordInterceptor, - recoverer: DeadLetterPublishingRecoverer - ): ConcurrentKafkaListenerContainerFactory { - val factory = ConcurrentKafkaListenerContainerFactory() - factory.consumerFactory = consumerFactory - factory.setCommonErrorHandler( - DefaultErrorHandler( - recoverer, - FixedBackOff(20, 1) - ).also { it.addNotRetryableExceptions(StoveBusinessException::class.java) } - ) - factory.setRecordInterceptor(interceptor) - return factory - } - - @Bean - open fun recoverer( - kafkaTemplate: KafkaTemplate<*, *> - ): DeadLetterPublishingRecoverer = DeadLetterPublishingRecoverer(kafkaTemplate) - - @Bean - open fun consumerFactory( - config: KafkaTestSpringBotApplicationConfiguration - ): ConsumerFactory = DefaultKafkaConsumerFactory( - mapOf( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, - ConsumerConfig.GROUP_ID_CONFIG to config.groupId, - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to config.offset, - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to Serdes.String().deserializer().javaClass, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to Serdes.String().deserializer().javaClass, - ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 2000, - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 6000, - ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 6000 - ) - ) - - @Bean - open fun kafkaTemplate( - config: KafkaTestSpringBotApplicationConfiguration - ): KafkaTemplate = KafkaTemplate( - DefaultKafkaProducerFactory( - mapOf( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to Serdes.String().serializer().javaClass, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to Serdes.String().serializer().javaClass, - ProducerConfig.ACKS_CONFIG to "1" - ) - ) - ) - - @KafkaListener(topics = ["topic"], groupId = "group_id") - fun listen(message: String) { - logger.info("Received Message in consumer: $message") - } - - @KafkaListener(topics = ["topic-failed"], groupId = "group_id") - fun listen_failed(message: String) { - logger.info("Received Message in failed consumer: $message") - throw StoveBusinessException("This exception is thrown intentionally for testing purposes.") - } - - @KafkaListener(topics = ["topic-failed.DLT"], groupId = "group_id") - fun listen_dead_letter(message: String) { - logger.info("Received Message in the lead letter, and allowing the fail by just logging: $message") - } -} - -class StoveBusinessException(message: String) : Exception(message) - -class StringSerdeKafkaSystemTests : ShouldSpec({ - beforeSpec { - TestSystem() - .with { - kafka { - KafkaSystemOptions( - serdeForPublish = StoveSerde.jackson.anyJsonStringSerde(), - configureExposedConfiguration = { - listOf( - "kafka.bootstrapServers=${it.bootstrapServers}", - "kafka.groupId=test-group", - "kafka.offset=earliest" - ) - }, - containerOptions = KafkaContainerOptions { - } - ) - } - springBoot( - runner = { params -> - KafkaTestSpringBotApplicationForStringSerde.run(params) { - addInitializers( - beans { - bean>() - bean { StoveSerde.jackson.anyByteArraySerde() } - } - ) - } - } - ) - }.run() - } - - afterSpec { - TestSystem.stop() - } - - should("publish and consume") { - validate { - kafka { - val userId = Random.nextInt().toString() - val message = "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" - val headers = mapOf("x-user-id" to userId) - publish("topic", message, headers = headers) - shouldBePublished { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" - } - shouldBeConsumed { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" - } - } - } - } - - should("publish and consume with failed consumer") { - shouldThrowMaybe { - validate { - kafka { - val userId = Random.nextInt().toString() - val message = "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" - val headers = mapOf("x-user-id" to userId) - publish("topic-failed", message, headers = headers) - shouldBePublished { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed" - } - shouldBeFailed { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed" && reason is StoveBusinessException - } - - shouldBePublished { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed.DLT" - } - } - } - } - } - - should("admin operations") { - validate { - kafka { - adminOperations { - val topic = "topic" - createTopics(listOf(NewTopic(topic, 1, 1))) - listTopics().names().get().contains(topic) shouldBe true - deleteTopics(listOf(topic)) - listTopics().names().get().contains(topic) shouldBe false - } - } - } - } - - should("publish with ser/de") { - validate { - kafka { - val userId = Random.nextInt().toString() - val message = "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" - val headers = mapOf("x-user-id" to userId) - publish("topic", message, serde = StoveSerde.jackson.anyJsonStringSerde().some(), headers = headers) - shouldBePublished { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" - } - shouldBeConsumed { - actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" - } - } - } - } -}) diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/ProtobufSerdeKafkaSystemTest.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/ProtobufSerdeKafkaSystemTest.kt new file mode 100644 index 000000000..eafb2be1e --- /dev/null +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/ProtobufSerdeKafkaSystemTest.kt @@ -0,0 +1,97 @@ +package com.trendyol.stove.testing.e2e.kafka.protobufserde + +import com.google.protobuf.* +import com.trendyol.stove.spring.testing.e2e.kafka.v1.Example.Product +import com.trendyol.stove.spring.testing.e2e.kafka.v1.product +import com.trendyol.stove.testing.e2e.kafka.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde +import com.trendyol.stove.testing.e2e.springBoot +import com.trendyol.stove.testing.e2e.system.TestSystem +import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate +import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde +import io.kotest.core.spec.style.ShouldSpec +import kotlinx.serialization.ExperimentalSerializationApi +import org.springframework.context.support.beans +import kotlin.random.Random + +@Suppress("UNCHECKED_CAST") +@OptIn(ExperimentalSerializationApi::class) +class StoveProtobufSerde : StoveSerde { + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerdeBasedOnRegistry() + + override fun serialize(value: Any): ByteArray = protobufSerde.serializer().serialize("any", value as Message) + + override fun deserialize(value: ByteArray, clazz: Class): T { + val ret: Message = protobufSerde.deserializer().deserialize("any", value) + ret as DynamicMessage + val parseFromMethod = clazz.getDeclaredMethod("parseFrom", ByteArray::class.java) + val obj = parseFromMethod(ret, ret.toByteArray()) as T + return obj + } +} + +class ProtobufSerdeKafkaSystemTest : ShouldSpec({ + beforeSpec { + TestSystem() + .with { + kafka { + KafkaSystemOptions( + serdeForPublish = StoveProtobufSerde(), + configureExposedConfiguration = { + listOf( + "kafka.bootstrapServers=${it.bootstrapServers}", + "kafka.groupId=test-group", + "kafka.offset=earliest", + "kafka.schemaRegistryUrl=mock://mock-registry" + ) + }, + containerOptions = KafkaContainerOptions { + } + ) + } + springBoot( + runner = { params -> + KafkaTestSpringBotApplicationForProtobufSerde.run(params) { + addInitializers( + beans { + bean>() + bean { StoveProtobufSerde() } + } + ) + } + }, + withParameters = listOf( + "spring.lifecycle.timeout-per-shutdown-phase=0s" + ) + ) + }.run() + } + + afterSpec { + TestSystem.stop() + } + + should("publish and consume") { + validate { + kafka { + val userId = Random.nextInt().toString() + val productId = Random.nextInt().toString() + val message = product { + id = productId + name = "product-${Random.nextInt()}" + price = Random.nextDouble() + currency = "eur" + description = "description-${Random.nextInt()}" + } + val headers = mapOf("x-user-id" to userId) + publish("topic-protobuf", message, headers = headers) + shouldBePublished { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-protobuf" + } + shouldBeConsumed { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-protobuf" + } + } + } + } +}) diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/app.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/app.kt new file mode 100644 index 000000000..b27fff530 --- /dev/null +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/protobufserde/app.kt @@ -0,0 +1,158 @@ +package com.trendyol.stove.testing.e2e.kafka.protobufserde + +import com.google.protobuf.Message +import com.trendyol.stove.testing.e2e.kafka.StoveBusinessException +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG +import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.* +import org.slf4j.* +import org.springframework.boot.* +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.context.properties.* +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.kafka.annotation.* +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.* +import org.springframework.kafka.listener.* +import org.springframework.util.backoff.FixedBackOff + +sealed class KafkaRegistry(open val url: String) { + object Mock : KafkaRegistry("mock://mock-registry") + + data class Defined(override val url: String) : KafkaRegistry(url) + + companion object { + fun createSerdeBasedOnRegistry(registry: KafkaRegistry = Mock): KafkaProtobufSerde { + val schemaRegistryClient = when (registry) { + is Mock -> MockSchemaRegistry.getClientForScope("mock-registry") + is Defined -> MockSchemaRegistry.getClientForScope(registry.url) + } + val serde: KafkaProtobufSerde = KafkaProtobufSerde(schemaRegistryClient) + val serdeConfig: MutableMap = HashMap() + serdeConfig[SCHEMA_REGISTRY_URL_CONFIG] = registry.url + serde.configure(serdeConfig, false) + return serde + } + } +} + +class StoveKafkaValueSerializer : Serializer { + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerdeBasedOnRegistry() + + override fun serialize( + topic: String, + data: T + ): ByteArray = when (data) { + is ByteArray -> data + else -> protobufSerde.serializer().serialize(topic, data as Message) + } +} + +class StoveKafkaValueDeserializer : Deserializer { + private val protobufSerde: KafkaProtobufSerde = KafkaRegistry.createSerdeBasedOnRegistry() + + override fun deserialize( + topic: String, + data: ByteArray + ): Message = protobufSerde.deserializer().deserialize(topic, data) +} + +@SpringBootApplication(scanBasePackages = ["com.trendyol.stove.testing.e2e.kafka.protobufserde"]) +@EnableKafka +@EnableConfigurationProperties(KafkaTestSpringBotApplicationForProtobufSerde.ProtobufSerdeKafkaConf::class) +open class KafkaTestSpringBotApplicationForProtobufSerde { + companion object { + fun run( + args: Array, + init: SpringApplication.() -> Unit = {} + ): ConfigurableApplicationContext { + System.setProperty("org.springframework.boot.logging.LoggingSystem", "none") + return runApplication(args = args) { + webApplicationType = WebApplicationType.NONE + init() + } + } + } + + private val logger: Logger = LoggerFactory.getLogger(javaClass) + + @ConfigurationProperties(prefix = "kafka") + @ConstructorBinding + data class ProtobufSerdeKafkaConf( + val bootstrapServers: String, + val groupId: String, + val offset: String, + val schemaRegistryUrl: String + ) + + @Bean + open fun createConfiguredSerdeForRecordValues(config: ProtobufSerdeKafkaConf): KafkaProtobufSerde { + val registry = when { + config.schemaRegistryUrl.contains("mock://") -> KafkaRegistry.Mock + else -> KafkaRegistry.Defined(config.schemaRegistryUrl) + } + return KafkaRegistry.createSerdeBasedOnRegistry(registry) + } + + @Bean + open fun kafkaListenerContainerFactory( + consumerFactory: ConsumerFactory, + interceptor: RecordInterceptor, + recoverer: DeadLetterPublishingRecoverer + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + factory.setCommonErrorHandler( + DefaultErrorHandler( + recoverer, + FixedBackOff(20, 1) + ).also { it.addNotRetryableExceptions(StoveBusinessException::class.java) } + ) + factory.setRecordInterceptor(interceptor) + return factory + } + + @Bean + open fun recoverer( + kafkaTemplate: KafkaTemplate<*, *> + ): DeadLetterPublishingRecoverer = DeadLetterPublishingRecoverer(kafkaTemplate) + + @Bean + open fun consumerFactory( + config: ProtobufSerdeKafkaConf + ): ConsumerFactory = DefaultKafkaConsumerFactory( + mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to config.groupId, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to config.offset, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to Serdes.String().deserializer().javaClass, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer().javaClass, + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 2000, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 6000, + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 6000 + ) + ) + + @Bean + open fun kafkaTemplate( + config: ProtobufSerdeKafkaConf + ): KafkaTemplate = KafkaTemplate( + DefaultKafkaProducerFactory( + mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to Serdes.String().serializer().javaClass, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StoveKafkaValueSerializer().javaClass, + ProducerConfig.ACKS_CONFIG to "1" + ) + ) + ) + + @KafkaListener(topics = ["topic-protobuf"], groupId = "group_id") + fun listen(message: Message) { + logger.info("Received Message in consumer: $message") + } +} diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/Setup.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/shared.kt similarity index 78% rename from starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/Setup.kt rename to starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/shared.kt index 33143b2fc..6d4506926 100644 --- a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/Setup.kt +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/shared.kt @@ -7,3 +7,5 @@ class Setup : AbstractProjectConfig() { @ExperimentalKotest override val concurrentSpecs: Int = 1 } + +class StoveBusinessException(message: String) : Exception(message) diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/StringSerdeKafkaSystemTest.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/StringSerdeKafkaSystemTest.kt new file mode 100644 index 000000000..6470e3a68 --- /dev/null +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/StringSerdeKafkaSystemTest.kt @@ -0,0 +1,129 @@ +package com.trendyol.stove.testing.e2e.kafka.stringserde + +import arrow.core.some +import com.trendyol.stove.testing.e2e.kafka.* +import com.trendyol.stove.testing.e2e.serialization.StoveSerde +import com.trendyol.stove.testing.e2e.springBoot +import com.trendyol.stove.testing.e2e.system.TestSystem +import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.shouldBe +import org.apache.kafka.clients.admin.NewTopic +import org.springframework.context.support.beans +import org.springframework.kafka.support.JacksonUtils +import kotlin.random.Random + +class StringSerdeKafkaSystemTests : ShouldSpec({ + beforeSpec { + TestSystem() + .with { + kafka { + KafkaSystemOptions( + serdeForPublish = StoveSerde.jackson.anyJsonStringSerde(JacksonUtils.enhancedObjectMapper()), + configureExposedConfiguration = { + listOf( + "kafka.bootstrapServers=${it.bootstrapServers}", + "kafka.groupId=test-group", + "kafka.offset=earliest" + ) + }, + containerOptions = KafkaContainerOptions { + } + ) + } + springBoot( + runner = { params -> + KafkaTestSpringBotApplicationForStringSerde.run(params) { + addInitializers( + beans { + bean>() + bean { StoveSerde.jackson.anyByteArraySerde() } + } + ) + } + }, + withParameters = listOf( + "spring.lifecycle.timeout-per-shutdown-phase=0s" + ) + ) + }.run() + } + + afterSpec { + TestSystem.stop() + } + + should("publish and consume") { + validate { + kafka { + val userId = Random.nextInt().toString() + val message = + "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" + val headers = mapOf("x-user-id" to userId) + publish("topic", message, headers = headers) + shouldBePublished { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" + } + shouldBeConsumed { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" + } + } + } + } + + should("publish and consume with failed consumer") { + shouldThrowMaybe { + validate { + kafka { + val userId = Random.nextInt().toString() + val message = + "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" + val headers = mapOf("x-user-id" to userId) + publish("topic-failed", message, headers = headers) + shouldBePublished { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed" + } + shouldBeFailed { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed" && reason is StoveBusinessException + } + + shouldBePublished { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic-failed.DLT" + } + } + } + } + } + + should("admin operations") { + validate { + kafka { + adminOperations { + val topic = "topic" + createTopics(listOf(NewTopic(topic, 1, 1))) + listTopics().names().get().contains(topic) shouldBe true + deleteTopics(listOf(topic)) + listTopics().names().get().contains(topic) shouldBe false + } + } + } + } + + should("publish with ser/de") { + validate { + kafka { + val userId = Random.nextInt().toString() + val message = + "this message is coming from ${testCase.descriptor.id.value} and testName is ${testCase.name.testName}" + val headers = mapOf("x-user-id" to userId) + publish("topic", message, serde = StoveSerde.jackson.anyJsonStringSerde().some(), headers = headers) + shouldBePublished { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" + } + shouldBeConsumed { + actual == message && this.metadata.headers["x-user-id"] == userId && this.metadata.topic == "topic" + } + } + } + } +}) diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/app.kt b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/app.kt new file mode 100644 index 000000000..fcd511e95 --- /dev/null +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/kafka/stringserde/app.kt @@ -0,0 +1,114 @@ +package com.trendyol.stove.testing.e2e.kafka.stringserde + +import com.trendyol.stove.testing.e2e.kafka.StoveBusinessException +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.Serdes +import org.slf4j.* +import org.springframework.boot.* +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.context.properties.* +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.kafka.annotation.* +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.* +import org.springframework.kafka.listener.* +import org.springframework.util.backoff.FixedBackOff + +@SpringBootApplication(scanBasePackages = ["com.trendyol.stove.testing.e2e.kafka.stringserde"]) +@EnableKafka +@EnableConfigurationProperties(KafkaTestSpringBotApplicationForStringSerde.StringSerdeKafkaConf::class) +open class KafkaTestSpringBotApplicationForStringSerde { + companion object { + fun run( + args: Array, + init: SpringApplication.() -> Unit = {} + ): ConfigurableApplicationContext { + System.setProperty("org.springframework.boot.logging.LoggingSystem", "none") + return runApplication(args = args) { + webApplicationType = WebApplicationType.NONE + init() + } + } + } + + private val logger: Logger = LoggerFactory.getLogger(javaClass) + + @ConfigurationProperties(prefix = "kafka") + @ConstructorBinding + data class StringSerdeKafkaConf( + val bootstrapServers: String, + val groupId: String, + val offset: String + ) + + @Bean + open fun kafkaListenerContainerFactory( + consumerFactory: ConsumerFactory, + interceptor: RecordInterceptor, + recoverer: DeadLetterPublishingRecoverer + ): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory + factory.setCommonErrorHandler( + DefaultErrorHandler( + recoverer, + FixedBackOff(20, 1) + ).also { it.addNotRetryableExceptions(StoveBusinessException::class.java) } + ) + factory.setRecordInterceptor(interceptor) + return factory + } + + @Bean + open fun recoverer( + kafkaTemplate: KafkaTemplate<*, *> + ): DeadLetterPublishingRecoverer = DeadLetterPublishingRecoverer(kafkaTemplate) + + @Bean + open fun consumerFactory( + config: StringSerdeKafkaConf + ): ConsumerFactory = DefaultKafkaConsumerFactory( + mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to config.groupId, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to config.offset, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to Serdes.String().deserializer().javaClass, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to Serdes.String().deserializer().javaClass, + ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to 2000, + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG to 6000, + ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 6000 + ) + ) + + @Bean + open fun kafkaTemplate( + config: StringSerdeKafkaConf + ): KafkaTemplate = KafkaTemplate( + DefaultKafkaProducerFactory( + mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to config.bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to Serdes.String().serializer().javaClass, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to Serdes.String().serializer().javaClass, + ProducerConfig.ACKS_CONFIG to "1" + ) + ) + ) + + @KafkaListener(topics = ["topic"], groupId = "group_id") + fun listen(message: String) { + logger.info("Received Message in consumer: $message") + } + + @KafkaListener(topics = ["topic-failed"], groupId = "group_id") + fun listenFailed(message: String) { + logger.info("Received Message in failed consumer: $message") + throw StoveBusinessException("This exception is thrown intentionally for testing purposes.") + } + + @KafkaListener(topics = ["topic-failed.DLT"], groupId = "group_id") + fun listenDeadLetter(message: String) { + logger.info("Received Message in the lead letter, and allowing the fail by just logging: $message") + } +} diff --git a/starters/spring/stove-spring-testing-e2e-kafka/src/test/proto/example.proto b/starters/spring/stove-spring-testing-e2e-kafka/src/test/proto/example.proto new file mode 100644 index 000000000..5290ea0d7 --- /dev/null +++ b/starters/spring/stove-spring-testing-e2e-kafka/src/test/proto/example.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +// buf:lint:ignore PACKAGE_DIRECTORY_MATCH +package com.trendyol.stove.spring.testing.e2e.kafka.v1; + +message Product { + string id = 1; + string name = 2; + string description = 3; + double price = 4; + string currency = 5; +}