diff --git a/examples/ktor-example/src/main/kotlin/stove/ktor/example/app/kafka.kt b/examples/ktor-example/src/main/kotlin/stove/ktor/example/app/kafka.kt index cb4c535d..de2d758c 100644 --- a/examples/ktor-example/src/main/kotlin/stove/ktor/example/app/kafka.kt +++ b/examples/ktor-example/src/main/kotlin/stove/ktor/example/app/kafka.kt @@ -19,14 +19,16 @@ fun kafka(): Module = module { } private fun createReceiver(config: AppConfiguration): KafkaReceiver { - val pollTimeoutSec = 1 + val pollTimeoutSec = 2 val heartbeatSec = pollTimeoutSec + 1 + val commitInterval = heartbeatSec + 1 val settings = ReceiverSettings( config.kafka.bootstrapServers, StringDeserializer(), ExampleAppKafkaValueDeserializer(), config.kafka.groupId, autoOffsetReset = AutoOffsetReset.Earliest, + commitStrategy = CommitStrategy.ByTime(commitInterval.seconds), pollTimeout = pollTimeoutSec.seconds, properties = Properties().apply { put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, config.kafka.interceptorClasses) diff --git a/examples/ktor-example/src/main/kotlin/stove/ktor/example/application/ExampleAppConsumer.kt b/examples/ktor-example/src/main/kotlin/stove/ktor/example/application/ExampleAppConsumer.kt index 2ed896e2..b9cb49db 100644 --- a/examples/ktor-example/src/main/kotlin/stove/ktor/example/application/ExampleAppConsumer.kt +++ b/examples/ktor-example/src/main/kotlin/stove/ktor/example/application/ExampleAppConsumer.kt @@ -2,12 +2,10 @@ package stove.ktor.example.application import io.github.nomisRev.kafka.receiver.KafkaReceiver import kotlinx.coroutines.* -import kotlinx.coroutines.flow.flattenConcat -import kotlinx.coroutines.flow.onEach -import org.apache.kafka.clients.consumer.* +import kotlinx.coroutines.flow.* +import org.apache.kafka.clients.consumer.ConsumerRecord import stove.ktor.example.app.AppConfiguration -@OptIn(DelicateCoroutinesApi::class) class ExampleAppConsumer( config: AppConfiguration, private val kafkaReceiver: KafkaReceiver @@ -20,14 +18,12 @@ class ExampleAppConsumer( kafkaReceiver .receiveAutoAck(topics) .flattenConcat() - .onEach(::consume) + .collect(::consume) } private fun consume(message: ConsumerRecord) { println("Consumed message: $message") } - fun stop() { - scope.cancel() - } + fun stop() = scope.cancel() } diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt index 4ad3fa6a..6de215c6 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/KafkaSystem.kt @@ -15,7 +15,6 @@ import org.apache.kafka.clients.admin.* import org.apache.kafka.clients.producer.* import org.apache.kafka.common.serialization.StringSerializer import org.slf4j.* -import java.util.concurrent.Executor import kotlin.collections.component1 import kotlin.collections.component2 import kotlin.reflect.KClass @@ -25,6 +24,7 @@ import kotlin.time.Duration.Companion.seconds var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default var stoveKafkaBridgePortDefault = "50051" const val STOVE_KAFKA_BRIDGE_PORT = "STOVE_KAFKA_BRIDGE_PORT" +internal val StoveKafkaCoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) @StoveDsl class KafkaSystem( @@ -35,7 +35,6 @@ class KafkaSystem( private lateinit var adminClient: Admin private lateinit var kafkaPublisher: KafkaProducer private lateinit var grpcServer: Server - private val grpcServerScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) @PublishedApi internal lateinit var sink: TestSystemMessageSink @@ -182,7 +181,7 @@ class KafkaSystem( System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString()) return Try { ServerBuilder.forPort(context.options.bridgeGrpcServerPort) - .executor(StoveCoroutineExecutor(grpcServerScope.also { it.ensureActive() })) + .executor(StoveKafkaCoroutineScope.also { it.ensureActive() }.asExecutor) .addService(StoveKafkaObserverGrpcServer(sink)) .handshakeTimeout(GRPC_TIMEOUT_IN_SECONDS, java.util.concurrent.TimeUnit.SECONDS) .permitKeepAliveTime(GRPC_TIMEOUT_IN_SECONDS, java.util.concurrent.TimeUnit.SECONDS) @@ -207,7 +206,7 @@ class KafkaSystem( } private suspend fun waitUntilHealthy(server: Server, duration: Duration) { - val client = GrpcUtils.createClient(server.port.toString()) + val client = GrpcUtils.createClient(server.port.toString(), StoveKafkaCoroutineScope) var healthy = false withTimeout(duration) { while (!healthy) { @@ -235,20 +234,9 @@ class KafkaSystem( override fun close(): Unit = runBlocking { Try { grpcServer.shutdownNow() - grpcServerScope.cancel() + StoveKafkaCoroutineScope.cancel() kafkaPublisher.close() executeWithReuseCheck { stop() } } }.recover { logger.warn("got an error while stopping: ${it.message}") }.let { } } - -private class StoveCoroutineExecutor(private val scope: CoroutineScope) : Executor { - private val logger: Logger = LoggerFactory.getLogger(javaClass) - - override fun execute(command: Runnable) { - scope.launch { - Either.catch { command.run() } - .mapLeft { logger.warn("got an error while executing command", it) } - } - } -} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/coroutines.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/coroutines.kt new file mode 100644 index 00000000..179e20ef --- /dev/null +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/coroutines.kt @@ -0,0 +1,45 @@ +package com.trendyol.stove.testing.e2e.standalone.kafka + +import kotlinx.coroutines.* +import java.util.concurrent.* + +val CoroutineScope.asExecutor: Executor + get() = StoveCoroutineExecutor(this) + +val CoroutineScope.asExecutorService: ExecutorService + get() = CoroutineExecutorService(this) + +internal class CoroutineExecutorService(private val coroutineScope: CoroutineScope) : AbstractExecutorService() { + override fun execute(command: Runnable) { + coroutineScope.launch { command.run() } + } + + override fun shutdown() { + coroutineScope.cancel() + } + + override fun shutdownNow(): List { + coroutineScope.cancel() + return emptyList() + } + + override fun isShutdown(): Boolean { + return coroutineScope.coroutineContext[Job]?.isCancelled ?: true + } + + override fun isTerminated(): Boolean { + return coroutineScope.coroutineContext[Job]?.isCompleted ?: true + } + + override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { + // Coroutine jobs don't support await termination out of the box + // This is a simplified implementation + return isTerminated + } +} + +internal class StoveCoroutineExecutor(private val scope: CoroutineScope) : Executor { + override fun execute(command: Runnable) { + scope.launch { command.run() } + } +} diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/GrpcUtils.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/GrpcUtils.kt index ac3602a9..a8e1f79d 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/GrpcUtils.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/GrpcUtils.kt @@ -1,24 +1,26 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting import com.squareup.wire.* -import com.trendyol.stove.testing.e2e.standalone.kafka.StoveKafkaObserverServiceClient +import com.trendyol.stove.testing.e2e.standalone.kafka.* +import kotlinx.coroutines.CoroutineScope import okhttp3.* import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration object GrpcUtils { - private val getClient = { + private val getClient = { scope: CoroutineScope -> OkHttpClient.Builder() .protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE)) .callTimeout(30.seconds.toJavaDuration()) .readTimeout(30.seconds.toJavaDuration()) .writeTimeout(30.seconds.toJavaDuration()) .connectTimeout(30.seconds.toJavaDuration()) + .dispatcher(Dispatcher(scope.asExecutorService)) .build() } - fun createClient(onPort: String): StoveKafkaObserverServiceClient = GrpcClient.Builder() - .client(getClient()) + fun createClient(onPort: String, scope: CoroutineScope): StoveKafkaObserverServiceClient = GrpcClient.Builder() + .client(getClient(scope)) .baseUrl("http://0.0.0.0:$onPort".toHttpUrl()) .build() .create() diff --git a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt index 5fec2900..db0519b4 100644 --- a/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt +++ b/lib/stove-testing-e2e-kafka/src/main/kotlin/com/trendyol/stove/testing/e2e/standalone/kafka/intercepting/StoveKafkaBridge.kt @@ -3,10 +3,6 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting import com.fasterxml.jackson.databind.ObjectMapper import com.trendyol.stove.functional.* import com.trendyol.stove.testing.e2e.standalone.kafka.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.* import org.apache.kafka.clients.producer.* @@ -20,23 +16,22 @@ class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor): ProducerRecord = runBlocking(scope.coroutineContext) { + override fun onSend(record: ProducerRecord): ProducerRecord = runBlocking { record.also { send(publishedMessage(it)) } } - override fun onConsume(records: ConsumerRecords): ConsumerRecords = runBlocking(scope.coroutineContext) { + override fun onConsume(records: ConsumerRecords): ConsumerRecords = runBlocking { records.also { consumedMessages(it).forEach { message -> send(message) } } } - override fun onCommit(offsets: MutableMap) = runBlocking(scope.coroutineContext) { + override fun onCommit(offsets: MutableMap) = runBlocking { committedMessages(offsets).forEach { send(it) } } override fun configure(configs: MutableMap) = Unit - override fun close() = scope.cancel() + override fun close() = Unit override fun onAcknowledgement(metadata: RecordMetadata, exception: Exception) = Unit @@ -111,7 +106,7 @@ class StoveKafkaBridge : ConsumerInterceptor, ProducerInterceptor