Skip to content

Commit

Permalink
fix reference issues for compile time
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Lien committed Feb 3, 2022
1 parent cdeb740 commit 921a7cb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ import java.util.function.Consumer
class KafkaFactory(private val db: DatabaseFactory) {

private val logger = LoggerFactory.getLogger(KafkaFactory::class.java)
private val appConfig = HoconApplicationConfig(ConfigFactory.load())
private val kafkaConfig = HoconApplicationConfig(ConfigFactory.load()).config("ktor.kafka")
private val consumerClient: KafkaConsumerClient

init {
logger.debug("Initializing KafkaFactory.")

val topics = getConsumerTopics()
val consumerProperties = configureProperties()
val topics = configureTopics()
val consumerTopics = configureConsumersTopics(topics)

consumerClient = KafkaConsumerClientBuilder.builder()
.withProperties(consumerProperties)
.withTopicConfigs(topics)
.withTopicConfigs(consumerTopics)
.build()

consumerClient.start()
Expand All @@ -41,9 +42,11 @@ class KafkaFactory(private val db: DatabaseFactory) {
}

private fun configureProperties(): Properties {
val consumerGroupId = appConfig.property("ktor.kafka.consumerGroupId").getString()
val kafkaBrokers = appConfig.property("ktor.kafka.kafkaBrokers").getString()
return if (appConfig.property("ktor.localDevelopment").getString() == "true") {
val consumerGroupId = kafkaConfig.property("consumerGroupId").getString()
val kafkaBrokers = kafkaConfig.property("kafkaBrokers").getString()
val isLocalDevelopment = HoconApplicationConfig(ConfigFactory.load()).property("ktor.localDevelopment").getString()
// // TODO: Discuss if we really need a local setup of kafka or not
return if (isLocalDevelopment == "true") {
KafkaPropertiesBuilder.consumerBuilder()
.withBrokerUrl(kafkaBrokers)
.withBaseProperties()
Expand All @@ -55,12 +58,14 @@ class KafkaFactory(private val db: DatabaseFactory) {
}
}

private fun configureTopics(): List<KafkaConsumerClientBuilder.TopicConfig<String, String>> {
return KafkaTopics.values().map { it ->
private fun getConsumerTopics() = kafkaConfig.config("topics").property("consumer").getList()

private fun configureConsumersTopics(topics: List<String>): List<KafkaConsumerClientBuilder.TopicConfig<String, String>> {
return topics.map { topic ->
KafkaConsumerClientBuilder.TopicConfig<String, String>()
.withLogging()
.withConsumerConfig(
it.topic,
topic,
stringDeserializer(),
stringDeserializer(),
Consumer<ConsumerRecord<String, String>> { logTopicContent(it) }
Expand Down

This file was deleted.

12 changes: 7 additions & 5 deletions backend/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ ktor {
kafkaBrokers = "localhost:9092"
kafkaBrokers = ${?KAFKA_BROKERS}
topics {
tiltaksgjennomforingEndret = "teamarenanais.aapen-arena-tiltakgjennomforingendret-v1-q2"
tiltaksdeltakerEndret = "teamarenanais.aapen-arena-tiltakdeltakerendret-v1-q2"
tiltaksgruppeEndret = "teamarenanais.aapen-arena-tiltaksgruppeendret-v1-q2"
tiltakEndret = "teamarenanais.aapen-arena-tiltakendret-v1-q2"
avtaleinfoEndret = "teamarenanais.aapen-arena-avtaleinfoendret-v1-q2"
consumer = [
teamarenanais.aapen-arena-tiltakgjennomforingendret-v1-q2,
teamarenanais.aapen-arena-tiltakdeltakerendret-v1-q2,
teamarenanais.aapen-arena-tiltaksgruppeendret-v1-q2,
teamarenanais.aapen-arena-tiltakendret-v1-q2,
teamarenanais.aapen-arena-avtaleinfoendret-v1-q2,
]
}
}
}

0 comments on commit 921a7cb

Please sign in to comment.