Skip to content

Commit

Permalink
Merge pull request vert-x3#122 from vert-x3/kafka-admin-client
Browse files Browse the repository at this point in the history
Added initial Kafka Admin Client implementation
  • Loading branch information
ppatierno authored Feb 26, 2019
2 parents 0da696e + d51c840 commit a05d5c1
Show file tree
Hide file tree
Showing 22 changed files with 2,531 additions and 168 deletions.
98 changes: 98 additions & 0 deletions src/main/asciidoc/adminclient.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
= Vert.x Kafka Admin Client
:toc: left
:lang: $lang
:$lang: $lang

This component provides a Vert.x wrapper around the Kafka Admin Client API.
The Kafka Admin Client is used to create, modify, and delete topics.
It also provides methods for handling ACLs (Access Control Lists), consumer groups and many more.

== Creating the Kafka Admin Client

Creating the admin client is quite similar on how it works using the native Kafka client library.

It needs to be configured with a bunch of properties as described in the official
Apache Kafka documentation, for the link:https://kafka.apache.org/documentation/#adminclientconfigs[admin].

To achieve that, a map can be configured with such properties passing it to one of the
static creation methods exposed by {@link io.vertx.kafka.admin.KafkaAdminClient}.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleCreateAdminClient}
----

== Using the Kafka Admin Client

=== Listing topics

You can call the {@link io.vertx.kafka.admin.KafkaAdminClient#listTopics} for listing the topics in the cluster.
The only parameter is the usual callback to handle the result, which provides the topics list.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleListTopics}
----

=== Describe topics

You can call {@link io.vertx.kafka.admin.KafkaAdminClient#describeTopics} to describe topics in the cluster.
Describing a topic means getting all related metadata like number of partitions, replicas, leader, in-sync replicas and so on.
The needed parameters are the list of topics names to describe, and the usual callback to handle the result providing
a map with topic names and related {@link io.vertx.kafka.admin.TopicDescription}.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleDescribeTopics}
----

=== Create topic

You can call {@link io.vertx.kafka.admin.KafkaAdminClient#createTopics} to create topics in the cluster.
The needed parameters are the list of the topics to create, and the usual callback to handle the result.
The topics to create are defined via the {@link io.vertx.kafka.admin.NewTopic} class specifying the name, the number of
partitions and the replication factor.
It is also possible to describe the replicas assignment, mapping each replica to the broker id, instead of specifying the
number of partitions and the replication factor (which in this case has to be set to -1).

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleCreateTopics}
----

=== Delete topic

You can call {@link io.vertx.kafka.admin.KafkaAdminClient#deleteTopics} to delete topics in the cluster.
The needed parameters are the list of the topics to delete, and the usual callback to handle the result.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleDeleteTopics}
----

=== Describe configuration

You can call {@link io.vertx.kafka.admin.KafkaAdminClient#describeConfigs} to describe resources configuration.
Describing resources configuration means getting all configuration information for cluster resources like topics or brokers.
The needed parameters are the list of the resources for which you want the configuration, and the usual callback to handle the result.
The resources are described by a collection of {@link io.vertx.kafka.client.common.ConfigResource} while the result maps
each resource with a corresponding {@link io.vertx.kafka.admin.Config} which as more {@link io.vertx.kafka.admin.ConfigEntry} for
each configuration parameter.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleDescribeConfigs}
----

=== Alter configuration

You can call {@link io.vertx.kafka.admin.KafkaAdminClient#alterConfigs} to alter resources configuration.
Altering resources configuration means updating configuration information for cluster resources like topics or brokers.
The needed parameters are the list of the resources with the related configurations to updated, and the usual callback to handle the result.
It is possible to alter configurations for different resources with just one call. The input parameter maps each
{@link io.vertx.kafka.client.common.ConfigResource} with the corresponding {@link io.vertx.kafka.admin.Config} you want to apply.

[source,$lang]
----
{@link examples.KafkaAdminClientExamples#exampleAlterConfigs}
----
174 changes: 174 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,129 @@
= Cheatsheets

[[Config]]
== Config

++++
A configuration object containing the configuration entries for a resource
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[entries]]`@entries`|`Array of link:dataobjects.html#ConfigEntry[ConfigEntry]`|+++
Set the configuration entries for a resource
+++
|===

[[ConfigEntry]]
== ConfigEntry

++++
A class representing a configuration entry containing name, value and additional metadata
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[default]]`@default`|`Boolean`|+++
Set whether the config value is the default or if it's been explicitly set
+++
|[[name]]`@name`|`String`|+++
Set the config name
+++
|[[readOnly]]`@readOnly`|`Boolean`|+++
Set whether the config is read-only and cannot be updated
+++
|[[sensitive]]`@sensitive`|`Boolean`|+++
Set whether the config value is sensitive. The value is always set to null by the broker if the config value is sensitive
+++
|[[source]]`@source`|`link:enums.html#ConfigSource[ConfigSource]`|+++
Set the source of this configuration entry
+++
|[[synonyms]]`@synonyms`|`Array of link:dataobjects.html#ConfigSynonym[ConfigSynonym]`|+++
Set all config values that may be used as the value of this config along with their source, in the order of precedence
+++
|[[value]]`@value`|`String`|+++
Set the value or null. Null is returned if the config is unset or if isSensitive is true
+++
|===

[[ConfigResource]]
== ConfigResource

++++
A class representing resources that have configuration
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[default]]`@default`|`Boolean`|+++
Set if this is the default resource of a resource type. Resource name is empty for the default resource.
+++
|[[name]]`@name`|`String`|+++
Set the resource name
+++
|[[type]]`@type`|`link:enums.html#Type[Type]`|+++
Set the resource type
+++
|===

[[ConfigSynonym]]
== ConfigSynonym

++++
Class representing a configuration synonym of a link
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[name]]`@name`|`String`|+++
Set the name of this configuration
+++
|[[source]]`@source`|`link:enums.html#ConfigSource[ConfigSource]`|+++
Set the source of this configuration
+++
|[[value]]`@value`|`String`|+++
Set the value of this configuration, which may be null if the configuration is sensitive
+++
|===

[[NewTopic]]
== NewTopic

++++
A new topic to be created
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[config]]`@config`|`String`|+++
Set the configuration for the new topic or null if no configs ever specified
+++
|[[name]]`@name`|`String`|+++
Set the name of the topic to be created
+++
|[[numPartitions]]`@numPartitions`|`Number (int)`|+++
Set the number of partitions for the new topic or -1 if a replica assignment has been specified
+++
|[[replicationFactor]]`@replicationFactor`|`Number (short)`|+++
Set the replication factor for the new topic or -1 if a replica assignment has been specified
+++
|===

[[Node]]
== Node

Expand Down Expand Up @@ -136,6 +260,30 @@ Set the topic the record was appended to
+++
|===

[[TopicDescription]]
== TopicDescription

++++
A detailed description of a single topic in the cluster
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[internal]]`@internal`|`Boolean`|+++
Set whether the topic is internal to Kafka.
+++
|[[name]]`@name`|`String`|+++
Set the name of the topic.
+++
|[[partitions]]`@partitions`|`Array of link:dataobjects.html#TopicPartitionInfo[TopicPartitionInfo]`|+++
Set A list of partitions where the index represents the partition id and the element
contains leadership and replica information for that partition.
+++
|===

[[TopicPartition]]
== TopicPartition

Expand All @@ -156,3 +304,29 @@ Set the topic name
+++
|===

[[TopicPartitionInfo]]
== TopicPartitionInfo

++++
A class containing leadership, replicas and ISR information for a topic partition.
++++
'''

[cols=">25%,25%,50%"]
[frame="topbot"]
|===
^|Name | Type ^| Description
|[[isr]]`@isr`|`Array of link:dataobjects.html#Node[Node]`|+++
Set the subset of the replicas that are in sync
+++
|[[leader]]`@leader`|`link:dataobjects.html#Node[Node]`|+++
Set the node id of the node currently acting as a leader
+++
|[[partition]]`@partition`|`Number (int)`|+++
Set the partition id
+++
|[[replicas]]`@replicas`|`Array of link:dataobjects.html#Node[Node]`|+++
Set the complete set of replicas for this partition
+++
|===

42 changes: 42 additions & 0 deletions src/main/generated/io/vertx/kafka/admin/ConfigConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.vertx.kafka.admin;

import io.vertx.core.json.JsonObject;
import io.vertx.core.json.JsonArray;
import java.time.Instant;
import java.time.format.DateTimeFormatter;

/**
* Converter for {@link io.vertx.kafka.admin.Config}.
* NOTE: This class has been automatically generated from the {@link io.vertx.kafka.admin.Config} original class using Vert.x codegen.
*/
public class ConfigConverter {

public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, Config obj) {
for (java.util.Map.Entry<String, Object> member : json) {
switch (member.getKey()) {
case "entries":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<io.vertx.kafka.admin.ConfigEntry> list = new java.util.ArrayList<>();
((Iterable<Object>)member.getValue()).forEach( item -> {
if (item instanceof JsonObject)
list.add(new io.vertx.kafka.admin.ConfigEntry((JsonObject)item));
});
obj.setEntries(list);
}
break;
}
}
}

public static void toJson(Config obj, JsonObject json) {
toJson(obj, json.getMap());
}

public static void toJson(Config obj, java.util.Map<String, Object> json) {
if (obj.getEntries() != null) {
JsonArray array = new JsonArray();
obj.getEntries().forEach(item -> array.add(item.toJson()));
json.put("entries", array);
}
}
}
Loading

0 comments on commit a05d5c1

Please sign in to comment.