diff --git a/lib/stove-testing-e2e-kafka/build.gradle.kts b/lib/stove-testing-e2e-kafka/build.gradle.kts index dfaa07ad..46829a78 100644 --- a/lib/stove-testing-e2e-kafka/build.gradle.kts +++ b/lib/stove-testing-e2e-kafka/build.gradle.kts @@ -9,7 +9,6 @@ dependencies { implementation(libs.kotlinx.io.reactor.extensions) implementation(libs.kotlinx.jdk8) implementation(libs.kotlinx.core) - implementation(libs.kafkaKotlin) implementation(libs.wire.grpc.server) implementation(libs.wire.grpc.client) implementation(libs.wire.grpc.runtime) @@ -25,6 +24,7 @@ dependencies { dependencies { testImplementation(libs.logback.classic) + testImplementation(libs.kafkaKotlin) } buildscript { diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/Extensions.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/Extensions.kt index f0c98fbf..821caa02 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/Extensions.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/Extensions.kt @@ -26,13 +26,13 @@ fun ConsumedMessage.offsets(): List = offsets.map { it.offset } + offset suspend inline fun KafkaProducer.dispatch( record: ProducerRecord -) = suspendCoroutine { continuation -> +): RecordMetadata = suspendCoroutine { continuation -> val callback = Callback { metadata, exception -> - if (metadata == null) { - continuation.resumeWithException(exception!!) + if (exception != null) { + continuation.resumeWithException(exception) } else { continuation.resume(metadata) } } - this.send(record, callback) + send(record, callback) } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt index 1cf7189d..a7b6555e 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt @@ -9,7 +9,6 @@ import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.* import com.trendyol.stove.testing.e2e.system.TestSystem import com.trendyol.stove.testing.e2e.system.abstractions.* import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl -import io.github.nomisRev.kafka.* import io.grpc.* import kotlinx.coroutines.* import org.apache.kafka.clients.admin.* @@ -17,6 +16,8 @@ import org.apache.kafka.clients.producer.* import org.apache.kafka.common.serialization.StringSerializer import org.slf4j.* import java.util.concurrent.ScheduledThreadPoolExecutor +import kotlin.collections.component1 +import kotlin.collections.component2 import kotlin.reflect.KClass import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -158,11 +159,12 @@ class KafkaSystem( ) ) - private fun createAdminClient(exposedConfiguration: KafkaExposedConfiguration): Admin = - mapOf( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers, - AdminClientConfig.CLIENT_ID_CONFIG to "stove-kafka-admin-client" - ).let { Admin(AdminSettings(exposedConfiguration.bootstrapServers, it.toProperties())) } + private fun createAdminClient( + exposedConfiguration: KafkaExposedConfiguration + ): Admin = mapOf( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers, + AdminClientConfig.CLIENT_ID_CONFIG to "stove-kafka-admin-client" + ).let { Admin.create(it.toProperties()) } private suspend fun startGrpcServer(): Server { System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString()) @@ -181,9 +183,8 @@ class KafkaSystem( .maxInboundMetadataSize(MAX_MESSAGE_SIZE) .permitKeepAliveWithoutCalls(true) .build() - .start().also { - waitUntilHealthy(it) - } + .start() + .also { waitUntilHealthy(it, 30.seconds) } }.recover { logger.error("Failed to start Stove Message Sink Grpc Server", it) throw it @@ -193,10 +194,10 @@ class KafkaSystem( }.get() } - private suspend fun waitUntilHealthy(server: Server) { + private suspend fun waitUntilHealthy(server: Server, duration: Duration) { val client = GrpcUtils.createClient(server.port.toString()) var healthy = false - withTimeout(30.seconds) { + withTimeout(duration) { while (!healthy) { logger.info("Waiting for Stove Message Sink Grpc Server to be healthy") Try { diff --git a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/DomainEvents.kt b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/DomainEvents.kt index 973d3ae2..e4beabbd 100644 --- a/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/DomainEvents.kt +++ b/lib/stove-testing-e2e-kafka/src/test/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/setup/example/DomainEvents.kt @@ -4,6 +4,4 @@ object DomainEvents { data class ProductCreated(val productId: String) data class ProductFailingCreated(val productId: String) - - data class BacklogCreated(val backlogId: String) }