Skip to content

Commit

Permalink
use producer from base kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 1, 2024
1 parent 05badf5 commit 4493015
Showing 1 changed file with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import com.trendyol.stove.testing.e2e.system.TestSystem
import com.trendyol.stove.testing.e2e.system.abstractions.*
import com.trendyol.stove.testing.e2e.system.annotations.StoveDsl
import io.github.nomisRev.kafka.*
import io.github.nomisRev.kafka.publisher.*
import io.grpc.*
import kotlinx.coroutines.*
import org.apache.kafka.clients.admin.*
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.*
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.*
import java.util.concurrent.ScheduledThreadPoolExecutor
import kotlin.coroutines.coroutineContext
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand All @@ -33,7 +33,7 @@ class KafkaSystem(
) : PluggedSystem, ExposesConfiguration, RunAware, AfterRunAware {
private lateinit var exposedConfiguration: KafkaExposedConfiguration
private lateinit var adminClient: Admin
private lateinit var kafkaPublisher: KafkaPublisher<String, Any>
private lateinit var kafkaPublisher: KafkaProducer<String, Any>
private lateinit var grpcServer: Server
private val grpcServerExecutor = ScheduledThreadPoolExecutor(1)

Expand Down Expand Up @@ -75,7 +75,16 @@ class KafkaSystem(
val record = ProducerRecord<String, Any>(topic, partition, key.getOrNull(), message)
headers.forEach { (k, v) -> record.headers().add(k, v.toByteArray()) }
testCase.map { record.headers().add("testCase", it.toByteArray()) }
kafkaPublisher.publishScope { offer(record) }
val scope = Job(coroutineContext.job)
val callback = Callback { _, exception ->
if (exception != null) {
scope.completeExceptionally(exception)
} else {
scope.complete()
}
}
kafkaPublisher.send(record, callback)
scope.join()
return this
}

Expand Down Expand Up @@ -147,11 +156,17 @@ class KafkaSystem(
condition: (message: ParsedMessage<T>) -> Boolean
): Unit = coroutineScope { sink.waitUntilRetried(atLeastIn, times, clazz, condition) }

private fun createPublisher(exposedConfiguration: KafkaExposedConfiguration): KafkaPublisher<String, Any> = PublisherSettings(
exposedConfiguration.bootstrapServers,
StringSerializer(),
StoveKafkaValueSerializer()
).let { KafkaPublisher(it) }
private fun createPublisher(
exposedConfiguration: KafkaExposedConfiguration
): KafkaProducer<String, Any> = KafkaProducer(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to exposedConfiguration.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java.name,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StoveKafkaValueSerializer::class.java.name,
ProducerConfig.CLIENT_ID_CONFIG to "stove-kafka-producer",
ProducerConfig.ACKS_CONFIG to "1"
)
)

private fun createAdminClient(exposedConfiguration: KafkaExposedConfiguration): Admin =
mapOf<String, Any>(
Expand Down

0 comments on commit 4493015

Please sign in to comment.