Skip to content

Commit

Permalink
simplify admin client creation
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 1, 2024
1 parent 2edcd19 commit 339f967
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion lib/stove-testing-e2e-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ dependencies {
implementation(libs.kotlinx.io.reactor.extensions)
implementation(libs.kotlinx.jdk8)
implementation(libs.kotlinx.core)
implementation(libs.kafkaKotlin)
implementation(libs.wire.grpc.server)
implementation(libs.wire.grpc.client)
implementation(libs.wire.grpc.runtime)
Expand All @@ -25,6 +24,7 @@ dependencies {

dependencies {
testImplementation(libs.logback.classic)
testImplementation(libs.kafkaKotlin)
}

buildscript {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ fun ConsumedMessage.offsets(): List<Long> = offsets.map { it.offset } + offset

suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(
record: ProducerRecord<K, V>
) = suspendCoroutine { continuation ->
): RecordMetadata = suspendCoroutine { continuation ->
val callback = Callback { metadata, exception ->
if (metadata == null) {
continuation.resumeWithException(exception!!)
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(metadata)
}
}
this.send(record, callback)
send(record, callback)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.*
import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.abstractions.*
import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl
import io.github.nomisRev.kafka.*
import io.grpc.*
import kotlinx.coroutines.*
import org.apache.kafka.clients.admin.*
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.*
import java.util.concurrent.ScheduledThreadPoolExecutor
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -158,11 +159,12 @@ class KafkaSystem(
)
)

private fun createAdminClient(exposedConfiguration: KafkaExposedConfiguration): Admin =
mapOf<String, Any>(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers,
AdminClientConfig.CLIENT_ID_CONFIG to "stove-kafka-admin-client"
).let { Admin(AdminSettings(exposedConfiguration.bootstrapServers, it.toProperties())) }
private fun createAdminClient(
exposedConfiguration: KafkaExposedConfiguration
): Admin = mapOf<String, Any>(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers,
AdminClientConfig.CLIENT_ID_CONFIG to "stove-kafka-admin-client"
).let { Admin.create(it.toProperties()) }

private suspend fun startGrpcServer(): Server {
System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString())
Expand All @@ -181,9 +183,8 @@ class KafkaSystem(
.maxInboundMetadataSize(MAX_MESSAGE_SIZE)
.permitKeepAliveWithoutCalls(true)
.build()
.start().also {
waitUntilHealthy(it)
}
.start()
.also { waitUntilHealthy(it, 30.seconds) }
}.recover {
logger.error("Failed to start Stove Message Sink Grpc Server", it)
throw it
Expand All @@ -193,10 +194,10 @@ class KafkaSystem(
}.get()
}

private suspend fun waitUntilHealthy(server: Server) {
private suspend fun waitUntilHealthy(server: Server, duration: Duration) {
val client = GrpcUtils.createClient(server.port.toString())
var healthy = false
withTimeout(30.seconds) {
withTimeout(duration) {
while (!healthy) {
logger.info("Waiting for Stove Message Sink Grpc Server to be healthy")
Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ object DomainEvents {
data class ProductCreated(val productId: String)

data class ProductFailingCreated(val productId: String)

data class BacklogCreated(val backlogId: String)
}

0 comments on commit 339f967

Please sign in to comment.