Skip to content

Commit

Permalink
Merge pull request #21 from navikt/feat/move-topics-to-config
Browse files Browse the repository at this point in the history
Flytt topics fra inline code til config fil
  • Loading branch information
Steffen Lien authored Feb 8, 2022
2 parents 9f3bf24 + 2a9deca commit 934d0ef
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ import java.util.function.Consumer
class KafkaFactory(private val db: DatabaseFactory) {

private val logger = LoggerFactory.getLogger(KafkaFactory::class.java)
private val appConfig = HoconApplicationConfig(ConfigFactory.load())
private val kafkaConfig = HoconApplicationConfig(ConfigFactory.load()).config("ktor.kafka")
private val consumerClient: KafkaConsumerClient

init {
logger.debug("Initializing KafkaFactory.")

val topics = getConsumerTopics()
val consumerProperties = configureProperties()
val topics = configureTopics()
val consumerTopics = configureConsumersTopics(topics)

consumerClient = KafkaConsumerClientBuilder.builder()
.withProperties(consumerProperties)
.withTopicConfigs(topics)
.withTopicConfigs(consumerTopics)
.build()

consumerClient.start()
Expand All @@ -41,10 +42,13 @@ class KafkaFactory(private val db: DatabaseFactory) {
}

private fun configureProperties(): Properties {
val consumerGroupId = "mulighetsrommet-api-consumer.v1"
return if (appConfig.property("ktor.localDevelopment").getString() == "true") {
val consumerGroupId = kafkaConfig.property("consumerGroupId").getString()
val kafkaBrokers = kafkaConfig.property("kafkaBrokers").getString()
val isLocalDevelopment = HoconApplicationConfig(ConfigFactory.load()).property("ktor.localDevelopment").getString()
// // TODO: Discuss if we really need a local setup of kafka or not
return if (isLocalDevelopment == "true") {
KafkaPropertiesBuilder.consumerBuilder()
.withBrokerUrl("localhost:9092")
.withBrokerUrl(kafkaBrokers)
.withBaseProperties()
.withConsumerGroupId(consumerGroupId)
.withDeserializers(ByteArrayDeserializer::class.java, ByteArrayDeserializer::class.java)
Expand All @@ -54,12 +58,14 @@ class KafkaFactory(private val db: DatabaseFactory) {
}
}

private fun configureTopics(): List<KafkaConsumerClientBuilder.TopicConfig<String, String>> {
return KafkaTopics.values().map { it ->
private fun getConsumerTopics() = kafkaConfig.config("topics").property("consumer").getList()

private fun configureConsumersTopics(topics: List<String>): List<KafkaConsumerClientBuilder.TopicConfig<String, String>> {
return topics.map { topic ->
KafkaConsumerClientBuilder.TopicConfig<String, String>()
.withLogging()
.withConsumerConfig(
it.topic,
topic,
stringDeserializer(),
stringDeserializer(),
Consumer<ConsumerRecord<String, String>> { logTopicContent(it) }
Expand Down

This file was deleted.

16 changes: 10 additions & 6 deletions backend/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ ktor {
}
kafka {
enable = true
appId = "mulighetsrommet-api"
clientId = "mulighetsrommet-api"
consumerGroupId = "mulighetsrommet-api-consumer.v1"
kafkaBrokers = "localhost:9092"
kafkaBrokers = ${?KAFKA_BROKERS}
host = "localhost"
host = ${?KAFKA_HOST}
port = 9002
port = ${?KAFKA_PORT}
topics {
consumer = [
teamarenanais.aapen-arena-tiltakgjennomforingendret-v1-q2,
teamarenanais.aapen-arena-tiltakdeltakerendret-v1-q2,
teamarenanais.aapen-arena-tiltaksgruppeendret-v1-q2,
teamarenanais.aapen-arena-tiltakendret-v1-q2,
teamarenanais.aapen-arena-avtaleinfoendret-v1-q2,
]
}
}
}

0 comments on commit 934d0ef

Please sign in to comment.