Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 1, 2024
1 parent d3d615c commit 8093842
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {
private suspend fun startConsumers(bootStrapServers: String) {
val consumerSettings = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "500",
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "1000",
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "2000",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG to "true",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer::class.java,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.trendyol.stove.testing.e2e.standalone.kafka.setup
package com.trendyol.stove.testing.e2e.standalone.kafka.setup.example

object DomainEvents {
data class ProductCreated(val productId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.trendyol.stove.testing.e2e.standalone.kafka.Caching.getOrPut
import com.trendyol.stove.testing.e2e.standalone.kafka.setup.example.KafkaTestShared.TopicDefinition
import io.github.nomisRev.kafka.publisher.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
Expand All @@ -31,20 +30,19 @@ abstract class StoveListener(
while (!consuming.isCancelled) {
consumer
.poll(Duration.ofMillis(100))
.asSequence()
.asFlow()
.collect { message ->
.forEach { message ->
logger.info("Message RECEIVED on the application side: ${message.value()}")
consume(message)
consume(message, consumer)
}
}
}
}

private suspend fun consume(message: ConsumerRecord<String, String>) {
private suspend fun consume(message: ConsumerRecord<String, String>, consumer: KafkaConsumer<String, String>) {
Try { listen(message) }
.map {
logger.info("Message COMMITTED on the application side: ${message.value()}")
consumer.commitAsync()
}
.recover {
logger.warn("CONSUMER GOT an ERROR on the application side, exception: $it")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.tests

import arrow.core.some
import com.trendyol.stove.testing.e2e.standalone.kafka.kafka
import com.trendyol.stove.testing.e2e.standalone.kafka.setup.DomainEvents.ProductCreated
import com.trendyol.stove.testing.e2e.standalone.kafka.setup.DomainEvents.ProductFailingCreated
import com.trendyol.stove.testing.e2e.standalone.kafka.setup.example.DomainEvents.ProductCreated
import com.trendyol.stove.testing.e2e.standalone.kafka.setup.example.DomainEvents.ProductFailingCreated
import com.trendyol.stove.testing.e2e.system.TestSystem.Companion.validate
import io.kotest.core.spec.style.FunSpec
import kotlin.random.Random
Expand Down

0 comments on commit 8093842

Please sign in to comment.