Skip to content

Commit

Permalink
GEOMESA-3386 Kafka - CLI add consumer configuration needed for Zookee…
Browse files Browse the repository at this point in the history
…per-less usage (#3158)
  • Loading branch information
elahrvivaz authored Aug 28, 2024
1 parent 2b5f12d commit a4390ee
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 28 deletions.
17 changes: 12 additions & 5 deletions docs/user/kafka/commandline.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ General Arguments
-----------------

Most commands require you to specify the connection to Kafka. This generally includes the list of
Kafka brokers and Zookeeper servers. Specify brokers with the ``--brokers`` (or ``-b``) argument, and
specify Zookeepers with ``--zookeepers`` (or ``-z``).

Kafka stores metadata under a particular path in Zookeeper - this can be thought of as a namespace
for feature types. Use ``--zkpath`` (or ``-p``) to override the default path.
Kafka brokers, specified with ``--brokers`` (or ``-b``). Connection properties can be specified with
``--producer-config`` for producers, ``--consumer-config`` for consumers, or ``--config`` which will
be applied to both. See the official Kafka documentation for the available
`producer <https://kafka.apache.org/documentation.html#producerconfigs>`_ and
`consumer <https://kafka.apache.org/documentation.html#consumerconfigs>`_ configs.

When using Zookeeper to store GeoMesa metadata, the Zookeeper servers must be specified with ``--zookeepers`` (or ``-z``).
The Zookeeper path for storing metadata can be specified with ``--zkpath`` (or ``-p``). See :ref:`no_zookeeper` for
details on when to use Zookeeper.

When not using Zookeeper, the GeoMesa catalog topic for storing metadata may be specified with ``--catalog`` (or ``-c``).
A catalog or path can be thought of as a namespace for feature types.

To connect to :ref:`Confluent Schema Registry <confluent_kds>` topics, use ``--schema-registry``
to provide the registry URL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,20 @@ trait KafkaDataStoreCommand extends DataStoreCommand[KafkaDataStore] {
val readBack = Option(params.readBack).map(_.toString).getOrElse {
if (params.fromBeginning) { "Inf" } else { null }
}
val consumerProps =
Option(params.consumerProperties).map(FileUtils.readFileToString(_, StandardCharsets.UTF_8)).orNull
val producerProps =
Option(params.producerProperties).map(FileUtils.readFileToString(_, StandardCharsets.UTF_8)).orNull

val genericProps = if (params.genericProperties == null) { null } else {
FileUtils.readFileToString(params.genericProperties, StandardCharsets.UTF_8)
}

def mergeProps(f: File): String = {
if (f == null) { genericProps } else {
val p = FileUtils.readFileToString(f, StandardCharsets.UTF_8)
if (genericProps == null) { p } else { s"$genericProps\n$p" } // note: later keys overwrite earlier ones
}
}

val consumerProps = mergeProps(params.consumerProperties)
val producerProps = mergeProps(params.producerProperties)

Map[String, String](
KafkaDataStoreParams.Brokers.getName -> params.brokers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ trait KafkaDataStoreParams {
@Parameter(names = Array("--schema-registry"), description = "URL to a Confluent Schema Registry")
var schemaRegistryUrl: String = _

def consumerProperties: File
def producerProperties: File
@Parameter(names = Array("--producer-config"), description = "Properties file used to configure the Kafka producer")
var producerProperties: File = _

@Parameter(names = Array("--consumer-config"), description = "Properties file used to configure the Kafka consumer")
var consumerProperties: File = _

@Parameter(names = Array("--config"), description = "Properties file used to configure the Kafka consumer/producer")
var genericProperties: File = _

def zookeepers: String
def numConsumers: Int
def replication: Int
Expand All @@ -55,15 +62,11 @@ trait ProducerDataStoreParams extends KafkaDataStoreParams {
@Parameter(names = Array("--partitions"), description = "Number of partitions for the Kafka topic")
var partitions: Int = 1 // note: can't use override modifier since it's a var

@Parameter(names = Array("--config"), description = "Properties file used to configure the Kafka producer")
var producerProperties: File = _

@Parameter(names = Array("--serialization"),
description = "Serialization format to use, ones of 'kryo', 'avro', or 'avro-native'",
validateValueWith = Array(classOf[SerializationValidator]))
var serialization: String = _

override val consumerProperties: File = null
override val numConsumers: Int = 0
override val readBack: Duration = null
override val fromBeginning: Boolean = false
Expand All @@ -83,12 +86,7 @@ trait ConsumerDataStoreParams extends KafkaDataStoreParams {
@Parameter(names = Array("--read-back"), description = "Consume messages written within this time frame, e.g. '1 hour'", converter = classOf[DurationConverter])
var readBack: Duration = _

@Parameter(names = Array("--config"), description = "Properties file used to configure the Kafka consumer")
var consumerProperties: File = _

override val producerProperties: File = null
override val serialization: String = null

override val replication: Int = 1
override val partitions: Int = 1
}
Expand All @@ -98,11 +96,7 @@ trait StatusDataStoreParams extends KafkaDataStoreParams {
@Parameter(names = Array("-z", "--zookeepers"), description = "Zookeepers (host[:port], comma separated)")
var zookeepers: String = _

@Parameter(names = Array("--config"), description = "Properties file used to configure the Kafka admin client")
var producerProperties: File = _

override val serialization: String = null
override val consumerProperties: File = null
override val numConsumers: Int = 0
override val replication: Int = 1
override val partitions: Int = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,7 @@ object KafkaMigrateZkCommand {
@Parameter(names = Array("--delete"), description = "Delete the metadata out of Zookeeper after migration")
var delete: Boolean = false

@Parameter(names = Array("--config"), description = "Properties file used to configure the Kafka admin client")
var producerProperties: File = _

override val serialization: String = null
override val consumerProperties: File = null
override val partitions: Int = 1 // note: ignored for the metadata topic
override val numConsumers: Int = 0
override val readBack: Duration = null
Expand Down

0 comments on commit a4390ee

Please sign in to comment.