diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index b6a7ec7077e13..17417361efc4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -309,6 +310,25 @@ default ListTopicsResult listTopics() { */ ListTopicsResult listTopics(ListTopicsOptions options); + /** + * List all federated topic znodes + * @return all federated topic znodes, formatted /namespace/topic + */ + default ListFederatedTopicZnodesResult listFederatedTopicZnodes() { + return listFederatedTopicZnodes(Collections.emptyList(), new ListFederatedTopicZnodesOptions()); + } + + /** + * List federated topic znodes match given topic names; if empty list passed, all existing federated topic znodes + * will be listed + * @param federatedTopics topic names + * @param options the options to use when list federated topic znodes + * @return empty list if the given topic names' znode don't exist; otherwise return the federated topics formatted + * /namespace/topic + */ + ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options); + /** * Describe some topics in the cluster, with the default options. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e400ffe9c66a7..f99d67d942f1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -143,6 +143,7 @@ import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData; import org.apache.kafka.common.message.LiCreateFederatedTopicZnodesRequestData; import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData; import org.apache.kafka.common.message.LiMoveControllerRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -221,6 +222,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckRequest; import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckResponse; +import org.apache.kafka.common.requests.LiListFederatedTopicZnodesRequest; +import org.apache.kafka.common.requests.LiListFederatedTopicZnodesResponse; import org.apache.kafka.common.requests.LiMoveControllerRequest; import org.apache.kafka.common.requests.LiMoveControllerResponse; import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesRequest; @@ -2029,6 +2032,45 @@ void handleFailure(Throwable throwable) { return new ListTopicsResult(topicListingFuture); } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + final KafkaFutureImpl> federatedTopicZnodesListingFuture = new KafkaFutureImpl<>(); + List topicsRequested = new ArrayList<>(); + federatedTopics.forEach(topic -> + topicsRequested.add(new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic))); + final long now = time.milliseconds(); + runnable.call(new Call("listFederatedTopicZnodes", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new LiListFederatedTopicZnodesRequest.Builder( + new LiListFederatedTopicZnodesRequestData().setTopics(topicsRequested), (short) 0 + ); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + LiListFederatedTopicZnodesResponse response = (LiListFederatedTopicZnodesResponse) abstractResponse; + Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { + federatedTopicZnodesListingFuture.completeExceptionally(error.exception()); + return; + } + + List allFederatedTopicZnodes = new ArrayList<>(response.data().topics()); + federatedTopicZnodesListingFuture.complete(allFederatedTopicZnodes); + } + + @Override + void handleFailure(Throwable throwable) { + federatedTopicZnodesListingFuture.completeExceptionally(throwable); + } + }, now); + return new ListFederatedTopicZnodesResult(federatedTopicZnodesListingFuture); + } + @Override public DescribeTopicsResult describeTopics(final Collection topicNames, DescribeTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicNames.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java new file mode 100644 index 0000000000000..3e8a05315a0da --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * Options for {@link Admin#listFederatedTopicZnodes()}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListFederatedTopicZnodesOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java new file mode 100644 index 0000000000000..0f997152e21fe --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.List; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * The result of the {@link Admin#listFederatedTopicZnodes()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListFederatedTopicZnodesResult { + final KafkaFuture> future; + + ListFederatedTopicZnodesResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which yields a list of federated topic names + */ + public KafkaFuture> topics() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java index 6e49a12f13bff..57a3acc009bf7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java @@ -68,6 +68,12 @@ public ListTopicsResult listTopics(ListTopicsOptions options) { return null; } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + return null; + } + @Override public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { return null; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a4f09b78ac4b1..8de25faaff839 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -116,7 +116,8 @@ public enum ApiKeys { LI_MOVE_CONTROLLER(ApiMessageType.LI_MOVE_CONTROLLER, true), LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true), - LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true); + LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true), + LI_LIST_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_LIST_FEDERATED_TOPIC_ZNODES); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 856c06f584a74..16f3e1dfbccf8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -313,6 +313,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return LiCreateFederatedTopicZnodesRequest.parse(buffer, apiVersion); case LI_DELETE_FEDERATED_TOPIC_ZNODES: return LiDeleteFederatedTopicZnodesRequest.parse(buffer, apiVersion); + case LI_LIST_FEDERATED_TOPIC_ZNODES: + return LiListFederatedTopicZnodesRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7d0d9bb88438a..0e515450f4309 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -257,6 +257,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return LiCreateFederatedTopicZnodesResponse.parse(responseBuffer, version); case LI_DELETE_FEDERATED_TOPIC_ZNODES: return LiDeleteFederatedTopicZnodesResponse.parse(responseBuffer, version); + case LI_LIST_FEDERATED_TOPIC_ZNODES: + return LiListFederatedTopicZnodesResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java new file mode 100644 index 0000000000000..4794509b837bf --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Collections; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiListFederatedTopicZnodesRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + + private final LiListFederatedTopicZnodesRequestData data; + + public Builder(LiListFederatedTopicZnodesRequestData data, short allowedVersion) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, allowedVersion); + this.data = data; + } + + @Override + public LiListFederatedTopicZnodesRequest build(short version) { + return new LiListFederatedTopicZnodesRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final LiListFederatedTopicZnodesRequestData data; + + public LiListFederatedTopicZnodesRequest(LiListFederatedTopicZnodesRequestData data, short version) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, version); + this.data = data; + } + + @Override + public LiListFederatedTopicZnodesResponse getErrorResponse(int throttleTimeMs, Throwable e) { + LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData(). + setTopics(Collections.emptyList()). + setThrottleTimeMs(throttleTimeMs). + setErrorCode(Errors.forException(e).code()); + return new LiListFederatedTopicZnodesResponse(data, version()); + } + + public static LiListFederatedTopicZnodesRequest parse(ByteBuffer buffer, short version) { + return new LiListFederatedTopicZnodesRequest( + new LiListFederatedTopicZnodesRequestData(new ByteBufferAccessor(buffer), version), version + ); + } + + @Override + public LiListFederatedTopicZnodesRequestData data() { + return data; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java new file mode 100644 index 0000000000000..feb3719f98d4c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiListFederatedTopicZnodesResponse extends AbstractResponse { + private final LiListFederatedTopicZnodesResponseData data; + private final short version; + + public LiListFederatedTopicZnodesResponse(LiListFederatedTopicZnodesResponseData data, short version) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES); + this.data = data; + this.version = version; + } + + @Override + public LiListFederatedTopicZnodesResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + public static LiListFederatedTopicZnodesResponse parse(ByteBuffer buffer, short version) { + return new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData(new ByteBufferAccessor(buffer), version), version + ); + } + + public static LiListFederatedTopicZnodesResponse prepareResponse(Errors error, int throttleTimeMs, short version) { + LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData(); + data.setErrorCode(error.code()); + data.setThrottleTimeMs(throttleTimeMs); + return new LiListFederatedTopicZnodesResponse(data, version); + } + + public short version() { + return version; + } + + @Override + public String toString() { + return data.toString(); + } +} diff --git a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json new file mode 100644 index 0000000000000..4ba7709ef6220 --- /dev/null +++ b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1005, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "LiListFederatedTopicZnodesRequest", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The simple name of the federated topics", + "fields": [ + {"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"} + ]} + ] +} diff --git a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json new file mode 100644 index 0000000000000..1683c71a65238 --- /dev/null +++ b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1005, + "type": "response", + "name": "LiListFederatedTopicZnodesResponse", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "type": "[]string", "versions": "0+", "about": "The name and namespace of the federated topic"}, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code." }, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 7cbd95d8ad8de..60cd6d83ad581 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -360,6 +360,12 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { return new ListTopicsResult(future); } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override synchronized public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { Map> topicDescriptions = new HashMap<>(); diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index cb9ddccf8c29b..9f121a930ed8b 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -100,6 +100,7 @@ object RequestConvertToJson { case req: LiMoveControllerRequest => LiMoveControllerRequestDataJsonConverter.write(req.data, request.version()) case req: LiCreateFederatedTopicZnodesRequest => LiCreateFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) case req: LiDeleteFederatedTopicZnodesRequest => LiDeleteFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) + case req: LiListFederatedTopicZnodesRequest => LiListFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -180,6 +181,7 @@ object RequestConvertToJson { case res: LiMoveControllerResponse => LiMoveControllerResponseDataJsonConverter.write(res.data, version) case res: LiCreateFederatedTopicZnodesResponse => LiCreateFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) case res: LiDeleteFederatedTopicZnodesResponse => LiDeleteFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) + case res: LiListFederatedTopicZnodesResponse => LiListFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 854525381a4b6..35e3a405cf7ea 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -262,6 +262,8 @@ class KafkaApis(val requestChannel: RequestChannel, // LI_DELETE_FEDERATED_TOPIC_ZNODES is decoupled from DELETE_TOPICS and only changes the ACL validation behavior // within kafka-server level case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => maybeForwardToController(request, handleDeleteFederatedTopicZnodesRequest) + // LI_LIST_FEDERATED_TOPIC_ZNODES is decoupled from LIST_TOPICS and not affect any other operation + case ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES => handleListFederatedTopicZnodesRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3813,6 +3815,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } } + + def handleListFederatedTopicZnodesRequest(request: RequestChannel.Request): Unit = { + val listfederatedTopicZnodesRequest = request.body[LiListFederatedTopicZnodesRequest] + val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) + val requestedTopics = listfederatedTopicZnodesRequest.data().topics() + + // only do authorization on cluster level, users are not expected to track this directly via kafka + if (!authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + LiListFederatedTopicZnodesResponse.prepareResponse( + Errors.CLUSTER_AUTHORIZATION_FAILED, requestThrottleMs, listfederatedTopicZnodesRequest.version() + ) + ) + return + } + + try { + if (requestedTopics.isEmpty) { + // if empty list passed, list all existing federated topics + val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics.toList.asJava + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData().setTopics(allFederatedTopicZnodes) + .setThrottleTimeMs(requestThrottleMs), listfederatedTopicZnodesRequest.version() + ) + ) + } else { + // if non-empty list passed, only list znode values for the given topics + val foundFederatedTopicZnodes = mutable.Set[String]() + + requestedTopics.forEach(topic => { + val curFederatedTopicZnode = zkSupport.zkClient.getFederatedTopic(topic.name()) + if (curFederatedTopicZnode != null) { + foundFederatedTopicZnodes.add(curFederatedTopicZnode) + } + }) + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData() + .setTopics(foundFederatedTopicZnodes.toList.asJava).setThrottleTimeMs(requestThrottleMs), + listfederatedTopicZnodesRequest.version() + ) + ) + } + } catch { + case throwable: Throwable => + requestHelper.sendResponseExemptThrottle(request, listfederatedTopicZnodesRequest.getErrorResponse(throwable)) + } + } } object KafkaApis { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 837fc0515eaa5..6f95bb0e50f27 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -145,6 +145,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, }.toMap } + def getFederatedTopic(topic: String): String = { + val getDataRequest = GetDataRequest(FederatedTopicZnode.path(topic)) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + getDataResponse.resultCode match { + case Code.OK => "/" + FederatedTopicZnode.decode(getDataResponse.data) + "/" + topic + case Code.NONODE => null + case _ => throw getDataResponse.resultException.get + } + } + def getAllFederatedTopics: Set[String] = { val topics = getChildren(FederatedTopicsZNode.path) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 9aaba26844df3..4117206522c1f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -136,7 +136,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testFederatedTopicCreateDeleteAndGet(): Unit = { client = Admin.create(createConfig) val federatedTopic = Map("federated-test-topic" -> "tracking").asJava - val expectedFedTopic = Seq("/tracking/federated-test-topic") + val expectedFedTopicString = "/tracking/federated-test-topic" + val expectedFedTopic = Seq(expectedFedTopicString) // create one federated topic client.createFederatedTopicZnodes(federatedTopic) // since creation is async, wait for federated topic znodes to be created @@ -146,6 +147,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val federatedTopics = zkClient.getAllFederatedTopics assertEquals(1, federatedTopics.size) + // test list federated topic znodes api + // 1. test list all + val allZnodes = client.listFederatedTopicZnodes().topics().get() + assertEquals(1, allZnodes.size()) + assertEquals(expectedFedTopicString, allZnodes.get(0)) + // 2. test list specific success + val expectedSuccess = client.listFederatedTopicZnodes(Collections.singletonList("federated-test-topic"), + new ListFederatedTopicZnodesOptions()).topics().get() + assertEquals(1, expectedSuccess.size()) + assertEquals(expectedFedTopicString, expectedSuccess.get(0)) + // 3. test list specific fail + val expectedFail = client.listFederatedTopicZnodes(Collections.singletonList("non-exist-topic"), + new ListFederatedTopicZnodesOptions()).topics().get() + assertEquals(0, expectedFail.size()) + // delete federated topic znode client.deleteFederatedTopicZnodes(federatedTopic) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index e0ced640ee1b9..2c1f33bb23e2d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -667,6 +667,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => new LiDeleteFederatedTopicZnodesRequest.Builder(new LiDeleteFederatedTopicZnodesRequestData(), ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES.latestVersion) + case ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES => + new LiListFederatedTopicZnodesRequest.Builder(new LiListFederatedTopicZnodesRequestData(), ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES.latestVersion) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) }