diff --git a/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaFactory.kt b/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaFactory.kt index 040b7e8fce..749ba97690 100644 --- a/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaFactory.kt +++ b/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaFactory.kt @@ -17,18 +17,19 @@ import java.util.function.Consumer class KafkaFactory(private val db: DatabaseFactory) { private val logger = LoggerFactory.getLogger(KafkaFactory::class.java) - private val appConfig = HoconApplicationConfig(ConfigFactory.load()) + private val kafkaConfig = HoconApplicationConfig(ConfigFactory.load()).config("ktor.kafka") private val consumerClient: KafkaConsumerClient init { logger.debug("Initializing KafkaFactory.") + val topics = getConsumerTopics() val consumerProperties = configureProperties() - val topics = configureTopics() + val consumerTopics = configureConsumersTopics(topics) consumerClient = KafkaConsumerClientBuilder.builder() .withProperties(consumerProperties) - .withTopicConfigs(topics) + .withTopicConfigs(consumerTopics) .build() consumerClient.start() @@ -41,9 +42,11 @@ class KafkaFactory(private val db: DatabaseFactory) { } private fun configureProperties(): Properties { - val consumerGroupId = appConfig.property("ktor.kafka.consumerGroupId").getString() - val kafkaBrokers = appConfig.property("ktor.kafka.kafkaBrokers").getString() - return if (appConfig.property("ktor.localDevelopment").getString() == "true") { + val consumerGroupId = kafkaConfig.property("consumerGroupId").getString() + val kafkaBrokers = kafkaConfig.property("kafkaBrokers").getString() + val isLocalDevelopment = HoconApplicationConfig(ConfigFactory.load()).property("ktor.localDevelopment").getString() + // // TODO: Discuss if we really need a local setup of kafka or not + return if (isLocalDevelopment == "true") { KafkaPropertiesBuilder.consumerBuilder() .withBrokerUrl(kafkaBrokers) .withBaseProperties() @@ -55,12 +58,14 @@ class KafkaFactory(private val db: DatabaseFactory) { } } - private fun configureTopics(): List> { - return KafkaTopics.values().map { it -> + private fun getConsumerTopics() = kafkaConfig.config("topics").property("consumer").getList() + + private fun configureConsumersTopics(topics: List): List> { + return topics.map { topic -> KafkaConsumerClientBuilder.TopicConfig() .withLogging() .withConsumerConfig( - it.topic, + topic, stringDeserializer(), stringDeserializer(), Consumer> { logTopicContent(it) } diff --git a/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaTopics.kt b/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaTopics.kt deleted file mode 100644 index 294b04b7ff..0000000000 --- a/backend/src/main/kotlin/no/nav/mulighetsrommet/api/kafka/KafkaTopics.kt +++ /dev/null @@ -1,12 +0,0 @@ -package no.nav.mulighetsrommet.api.kafka - -import com.typesafe.config.ConfigFactory -import io.ktor.config.HoconApplicationConfig - -enum class KafkaTopics(val topic: String) { - TiltaksgjennomforingEndret(HoconApplicationConfig(ConfigFactory.load()).property("ktor.kafka.topics.tiltaksgjennomforingEndret").toString()), - TiltaksdeltakerEndret(HoconApplicationConfig(ConfigFactory.load()).property("ktor.kafka.topics.tiltaksdeltakerEndret").toString()), - TiltaksgruppeEndret(HoconApplicationConfig(ConfigFactory.load()).property("ktor.kafka.topics.tiltaksgruppeEndret").toString()), - TiltakEndret(HoconApplicationConfig(ConfigFactory.load()).property("ktor.kafka.topics.tiltakEndret").toString()), - AvtaleinfoEndret(HoconApplicationConfig(ConfigFactory.load()).property("ktor.kafka.topics.avtaleinfoEndret").toString()) -} diff --git a/backend/src/main/resources/application.conf b/backend/src/main/resources/application.conf index 424aaa095f..9eb0dde489 100644 --- a/backend/src/main/resources/application.conf +++ b/backend/src/main/resources/application.conf @@ -27,11 +27,13 @@ ktor { kafkaBrokers = "localhost:9092" kafkaBrokers = ${?KAFKA_BROKERS} topics { - tiltaksgjennomforingEndret = "teamarenanais.aapen-arena-tiltakgjennomforingendret-v1-q2" - tiltaksdeltakerEndret = "teamarenanais.aapen-arena-tiltakdeltakerendret-v1-q2" - tiltaksgruppeEndret = "teamarenanais.aapen-arena-tiltaksgruppeendret-v1-q2" - tiltakEndret = "teamarenanais.aapen-arena-tiltakendret-v1-q2" - avtaleinfoEndret = "teamarenanais.aapen-arena-avtaleinfoendret-v1-q2" + consumer = [ + teamarenanais.aapen-arena-tiltakgjennomforingendret-v1-q2, + teamarenanais.aapen-arena-tiltakdeltakerendret-v1-q2, + teamarenanais.aapen-arena-tiltaksgruppeendret-v1-q2, + teamarenanais.aapen-arena-tiltakendret-v1-q2, + teamarenanais.aapen-arena-avtaleinfoendret-v1-q2, + ] } } }