From 903292d25b190416aa734e2bc8a3855e4a6a8a27 Mon Sep 17 00:00:00 2001 From: Oguzhan Soykan Date: Wed, 1 May 2024 10:33:00 +0200 Subject: [PATCH] tests --- .../kafka/intercepting/MessageSinkOps.kt | 1 - .../e2e/standalone/kafka/setup/ProjectConfig.kt | 4 ++++ .../kafka/setup/example/StoveListener.kt | 14 ++++++-------- .../setup/example/consumers/ProductConsumer.kt | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt index 6402090d..19932e49 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/MessageSinkOps.kt @@ -89,7 +89,6 @@ internal interface MessageSinkOps : MessageSinkPublishOps, CommonOps { } failedFunc.waitUntilCount(atLeastIn, times) - throwIfFailed(clazz, condition) } override fun dumpMessages(): String = "Sink so far:\n$store" diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt index f6c38adb..8592689f 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/ProjectConfig.kt @@ -69,6 +69,10 @@ class KafkaApplicationUnderTest : ApplicationUnderTest { @ExperimentalKotest class ProjectConfig : AbstractProjectConfig() { + init { + stoveKafkaBridgePortDefault = "50052" + } + override fun extensions(): List = listOf( SystemEnvironmentProjectListener(STOVE_KAFKA_BRIDGE_PORT, stoveKafkaBridgePortDefault) ) diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt index 462c7100..09f01462 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt @@ -53,12 +53,11 @@ abstract class StoveListener( publisher.publishScope { offer( ProducerRecord( - // topic = topicDefinition.retryTopic, - // key = + message.partition(), message.key(), - // value = - message.value() + message.value(), + message.headers() ) ) } @@ -70,12 +69,11 @@ abstract class StoveListener( } else { logger.error("CONSUMER GOT an ERROR, retry limit exceeded: $message") val record = ProducerRecord( - // topic = topicDefinition.deadLetterTopic, - // key = + message.partition(), message.key(), - // value = - message.value() + message.value(), + message.headers() ).apply { headers().add("doNotFail", "true".toByteArray()) } diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt index abf66daa..7229462d 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/consumers/ProductConsumer.kt @@ -13,6 +13,6 @@ class ProductConsumer( override val topicDefinition: TopicDefinition = TopicDefinition("product", "product.retry", "product.error") override suspend fun listen(record: ConsumerRecord) { - logger.info("Product consumed: ${record.value()}") + logger.info("Product consumed: ${record.value()} from topic: ${record.topic()} with key: ${record.key()}") } }