From 01118268fa8d7f4f30ccbff3bd002284f5cdc63a Mon Sep 17 00:00:00 2001 From: Michael Linsinbigler Date: Tue, 9 Jul 2024 11:21:11 -0400 Subject: [PATCH] GEOMESA-3375 Kafka - add consumer props to admin client connection (#3134) * Consumer props may contain connection parameters (SASL) set within GeoServer --- .../org/locationtech/geomesa/kafka/data/KafkaDataStore.scala | 2 ++ .../org/locationtech/geomesa/kafka/data/KafkaMetadata.scala | 1 + 2 files changed, 3 insertions(+) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala index 1101b2407c1c..34efecb5397f 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala @@ -229,6 +229,7 @@ class KafkaDataStore( val topic = KafkaDataStore.topic(sft) val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) + config.consumers.properties.foreach { case (k, v) => props.put(k, v) } config.producers.properties.foreach { case (k, v) => props.put(k, v) } WithClose(AdminClient.create(props)) { admin => @@ -266,6 +267,7 @@ class KafkaDataStore( val topic = KafkaDataStore.topic(sft) val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) + config.consumers.properties.foreach { case (k, v) => props.put(k, v) } config.producers.properties.foreach { case (k, v) => props.put(k, v) } WithClose(AdminClient.create(props)) { admin => diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaMetadata.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaMetadata.scala index fae229dbb090..a37644ac4424 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaMetadata.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaMetadata.scala @@ -111,6 +111,7 @@ class KafkaMetadata[T](val config: KafkaDataStoreConfig, val serializer: Metadat private def adminClientOp[V](fn: AdminClient => V): V = { val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) + config.consumers.properties.foreach { case (k, v) => props.put(k, v) } config.producers.properties.foreach { case (k, v) => props.put(k, v) } WithClose(AdminClient.create(props)) { admin => fn(admin) } }