Skip to content

Commit

Permalink
(kafka-standalone: common corotuine handling for kafka)
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 22, 2024
1 parent 0a8d3d3 commit f4f96cb
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ fun kafka(): Module = module {
}

private fun <V : Any> createReceiver(config: AppConfiguration): KafkaReceiver<String, V> {
val pollTimeoutSec = 1
val pollTimeoutSec = 2
val heartbeatSec = pollTimeoutSec + 1
val commitInterval = heartbeatSec + 1
val settings = ReceiverSettings(
config.kafka.bootstrapServers,
StringDeserializer(),
ExampleAppKafkaValueDeserializer<V>(),
config.kafka.groupId,
autoOffsetReset = AutoOffsetReset.Earliest,
commitStrategy = CommitStrategy.ByTime(commitInterval.seconds),
pollTimeout = pollTimeoutSec.seconds,
properties = Properties().apply {
put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, config.kafka.interceptorClasses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package stove.ktor.example.application

import io.github.nomisRev.kafka.receiver.KafkaReceiver
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flattenConcat
import kotlinx.coroutines.flow.onEach
import org.apache.kafka.clients.consumer.*
import kotlinx.coroutines.flow.*
import org.apache.kafka.clients.consumer.ConsumerRecord
import stove.ktor.example.app.AppConfiguration

@OptIn(DelicateCoroutinesApi::class)
class ExampleAppConsumer<K, V>(
config: AppConfiguration,
private val kafkaReceiver: KafkaReceiver<K, V>
Expand All @@ -20,14 +18,12 @@ class ExampleAppConsumer<K, V>(
kafkaReceiver
.receiveAutoAck(topics)
.flattenConcat()
.onEach(::consume)
.collect(::consume)
}

private fun consume(message: ConsumerRecord<K, V>) {
println("Consumed message: $message")
}

fun stop() {
scope.cancel()
}
fun stop() = scope.cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.apache.kafka.clients.admin.*
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.*
import java.util.concurrent.Executor
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KClass
Expand All @@ -25,6 +24,7 @@ import kotlin.time.Duration.Companion.seconds
var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default
var stoveKafkaBridgePortDefault = "50051"
const val STOVE_KAFKA_BRIDGE_PORT = "STOVE_KAFKA_BRIDGE_PORT"
internal val StoveKafkaCoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

@StoveDsl
class KafkaSystem(
Expand All @@ -35,7 +35,6 @@ class KafkaSystem(
private lateinit var adminClient: Admin
private lateinit var kafkaPublisher: KafkaProducer<String, Any>
private lateinit var grpcServer: Server
private val grpcServerScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

@PublishedApi
internal lateinit var sink: TestSystemMessageSink
Expand Down Expand Up @@ -182,7 +181,7 @@ class KafkaSystem(
System.setProperty(STOVE_KAFKA_BRIDGE_PORT, context.options.bridgeGrpcServerPort.toString())
return Try {
ServerBuilder.forPort(context.options.bridgeGrpcServerPort)
.executor(StoveCoroutineExecutor(grpcServerScope.also { it.ensureActive() }))
.executor(StoveKafkaCoroutineScope.also { it.ensureActive() }.asExecutor)
.addService(StoveKafkaObserverGrpcServer(sink))
.handshakeTimeout(GRPC_TIMEOUT_IN_SECONDS, java.util.concurrent.TimeUnit.SECONDS)
.permitKeepAliveTime(GRPC_TIMEOUT_IN_SECONDS, java.util.concurrent.TimeUnit.SECONDS)
Expand All @@ -207,7 +206,7 @@ class KafkaSystem(
}

private suspend fun waitUntilHealthy(server: Server, duration: Duration) {
val client = GrpcUtils.createClient(server.port.toString())
val client = GrpcUtils.createClient(server.port.toString(), StoveKafkaCoroutineScope)
var healthy = false
withTimeout(duration) {
while (!healthy) {
Expand Down Expand Up @@ -235,20 +234,9 @@ class KafkaSystem(
override fun close(): Unit = runBlocking {
Try {
grpcServer.shutdownNow()
grpcServerScope.cancel()
StoveKafkaCoroutineScope.cancel()
kafkaPublisher.close()
executeWithReuseCheck { stop() }
}
}.recover { logger.warn("got an error while stopping: ${it.message}") }.let { }
}

private class StoveCoroutineExecutor(private val scope: CoroutineScope) : Executor {
private val logger: Logger = LoggerFactory.getLogger(javaClass)

override fun execute(command: Runnable) {
scope.launch {
Either.catch { command.run() }
.mapLeft { logger.warn("got an error while executing command", it) }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.trendyol.stove.testing.e2e.standalone.kafka

import kotlinx.coroutines.*
import java.util.concurrent.*

val CoroutineScope.asExecutor: Executor
get() = StoveCoroutineExecutor(this)

val CoroutineScope.asExecutorService: ExecutorService
get() = CoroutineExecutorService(this)

internal class CoroutineExecutorService(private val coroutineScope: CoroutineScope) : AbstractExecutorService() {
override fun execute(command: Runnable) {
coroutineScope.launch { command.run() }
}

override fun shutdown() {
coroutineScope.cancel()
}

override fun shutdownNow(): List<Runnable> {
coroutineScope.cancel()
return emptyList()
}

override fun isShutdown(): Boolean {
return coroutineScope.coroutineContext[Job]?.isCancelled ?: true
}

override fun isTerminated(): Boolean {
return coroutineScope.coroutineContext[Job]?.isCompleted ?: true
}

override fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean {
// Coroutine jobs don't support await termination out of the box
// This is a simplified implementation
return isTerminated
}
}

internal class StoveCoroutineExecutor(private val scope: CoroutineScope) : Executor {
override fun execute(command: Runnable) {
scope.launch { command.run() }
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting

import com.squareup.wire.*
import com.trendyol.stove.testing.e2e.standalone.kafka.StoveKafkaObserverServiceClient
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import kotlinx.coroutines.CoroutineScope
import okhttp3.*
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration

object GrpcUtils {
private val getClient = {
private val getClient = { scope: CoroutineScope ->
OkHttpClient.Builder()
.protocols(listOf(Protocol.H2_PRIOR_KNOWLEDGE))
.callTimeout(30.seconds.toJavaDuration())
.readTimeout(30.seconds.toJavaDuration())
.writeTimeout(30.seconds.toJavaDuration())
.connectTimeout(30.seconds.toJavaDuration())
.dispatcher(Dispatcher(scope.asExecutorService))
.build()
}

fun createClient(onPort: String): StoveKafkaObserverServiceClient = GrpcClient.Builder()
.client(getClient())
fun createClient(onPort: String, scope: CoroutineScope): StoveKafkaObserverServiceClient = GrpcClient.Builder()
.client(getClient(scope))
.baseUrl("http://0.0.0.0:$onPort".toHttpUrl())
.build()
.create<StoveKafkaObserverServiceClient>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ package com.trendyol.stove.testing.e2e.standalone.kafka.intercepting
import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.functional.*
import com.trendyol.stove.testing.e2e.standalone.kafka.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.*
import org.apache.kafka.clients.producer.*
Expand All @@ -20,23 +16,22 @@ class StoveKafkaBridge<K, V> : ConsumerInterceptor<K, V>, ProducerInterceptor<K,
private val logger: Logger = org.slf4j.LoggerFactory.getLogger(StoveKafkaBridge::class.java)
private val client: StoveKafkaObserverServiceClient by lazy { startGrpcClient() }
private val mapper: ObjectMapper by lazy { stoveKafkaObjectMapperRef }
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

override fun onSend(record: ProducerRecord<K, V>): ProducerRecord<K, V> = runBlocking(scope.coroutineContext) {
override fun onSend(record: ProducerRecord<K, V>): ProducerRecord<K, V> = runBlocking {
record.also { send(publishedMessage(it)) }
}

override fun onConsume(records: ConsumerRecords<K, V>): ConsumerRecords<K, V> = runBlocking(scope.coroutineContext) {
override fun onConsume(records: ConsumerRecords<K, V>): ConsumerRecords<K, V> = runBlocking {
records.also { consumedMessages(it).forEach { message -> send(message) } }
}

override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) = runBlocking(scope.coroutineContext) {
override fun onCommit(offsets: MutableMap<TopicPartition, OffsetAndMetadata>) = runBlocking {
committedMessages(offsets).forEach { send(it) }
}

override fun configure(configs: MutableMap<String, *>) = Unit

override fun close() = scope.cancel()
override fun close() = Unit

override fun onAcknowledgement(metadata: RecordMetadata, exception: Exception) = Unit

Expand Down Expand Up @@ -111,7 +106,7 @@ class StoveKafkaBridge<K, V> : ConsumerInterceptor<K, V>, ProducerInterceptor<K,
private fun startGrpcClient(): StoveKafkaObserverServiceClient {
val onPort = System.getenv(STOVE_KAFKA_BRIDGE_PORT) ?: stoveKafkaBridgePortDefault
logger.info("Connecting to Stove Kafka Bridge on port $onPort")
return Try { GrpcUtils.createClient(onPort) }
return Try { GrpcUtils.createClient(onPort, StoveKafkaCoroutineScope) }
.map {
logger.info("Stove Kafka Observer Client created on port $onPort")
it
Expand Down

0 comments on commit f4f96cb

Please sign in to comment.