From 1bdb6d1706131b136cbe64c4fce6fc8a41fed8e3 Mon Sep 17 00:00:00 2001 From: Oguzhan Soykan Date: Wed, 1 May 2024 16:43:16 +0200 Subject: [PATCH] fix stovelistener for testing --- .../e2e/standalone/kafka/setup/example/StoveListener.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt index 05146614..f15dcff2 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/StoveListener.kt @@ -25,7 +25,7 @@ abstract class StoveListener( suspend fun start() { consumer.subscribe(listOf(topicDefinition.topic, topicDefinition.retryTopic, topicDefinition.deadLetterTopic)) consuming = GlobalScope.launch { - while (!consuming.isCancelled) { + while (!this.isActive) { consumer .poll(Duration.ofMillis(100)) .forEach { message -> @@ -84,7 +84,7 @@ abstract class StoveListener( abstract suspend fun listen(record: ConsumerRecord) - override fun close() { - Try { consuming.cancel() } + override fun close(): Unit = runBlocking { + Try { consuming.cancelAndJoin() } } }