Skip to content

Commit

Permalink
GEOMESA-3375 Kafka - add consumer props to admin client connection (#…
Browse files Browse the repository at this point in the history
…3134)

* Consumer props may contain connection parameters (SASL) set within GeoServer
  • Loading branch information
malinsinbigler authored Jul 9, 2024
1 parent 638b631 commit 0111826
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ class KafkaDataStore(
val topic = KafkaDataStore.topic(sft)
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
config.consumers.properties.foreach { case (k, v) => props.put(k, v) }
config.producers.properties.foreach { case (k, v) => props.put(k, v) }

WithClose(AdminClient.create(props)) { admin =>
Expand Down Expand Up @@ -266,6 +267,7 @@ class KafkaDataStore(
val topic = KafkaDataStore.topic(sft)
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
config.consumers.properties.foreach { case (k, v) => props.put(k, v) }
config.producers.properties.foreach { case (k, v) => props.put(k, v) }

WithClose(AdminClient.create(props)) { admin =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class KafkaMetadata[T](val config: KafkaDataStoreConfig, val serializer: Metadat
private def adminClientOp[V](fn: AdminClient => V): V = {
val props = new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
config.consumers.properties.foreach { case (k, v) => props.put(k, v) }
config.producers.properties.foreach { case (k, v) => props.put(k, v) }
WithClose(AdminClient.create(props)) { admin => fn(admin) }
}
Expand Down

0 comments on commit 0111826

Please sign in to comment.