From a62822a0c524cb335a4094392139726858b94b51 Mon Sep 17 00:00:00 2001 From: Ke Hu Date: Mon, 6 Nov 2023 14:22:53 -0800 Subject: [PATCH] [LI-HOTFIX] Add list federated topic znodes rpc (#493) This PR adds list federated topic znodes rpc that either list all federated topic znodes, or only given federated topic znodes matching the input simple topic names. LI_DESCRIPTION = To make all zk access go through kafka broker, in addition to the previous create/delete federated topic znodes rpcs, we also need to add read/list federated topic znodes rpc. Added integ-test to verify list federated topic znode api EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl. --- .../org/apache/kafka/clients/admin/Admin.java | 20 +++++ .../kafka/clients/admin/KafkaAdminClient.java | 42 ++++++++++ .../ListFederatedTopicZnodesOptions.java | 30 ++++++++ .../admin/ListFederatedTopicZnodesResult.java | 44 +++++++++++ .../kafka/clients/admin/NoOpAdminClient.java | 6 ++ .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../LiListFederatedTopicZnodesRequest.java | 76 +++++++++++++++++++ .../LiListFederatedTopicZnodesResponse.java | 74 ++++++++++++++++++ .../LiListFederatedTopicZnodesRequest.json | 29 +++++++ .../LiListFederatedTopicZnodesResponse.json | 29 +++++++ .../kafka/clients/admin/MockAdminClient.java | 6 ++ .../kafka/network/RequestConvertToJson.scala | 2 + .../main/scala/kafka/server/KafkaApis.scala | 53 +++++++++++++ .../main/scala/kafka/zk/KafkaZkClient.scala | 10 +++ .../api/PlaintextAdminIntegrationTest.scala | 18 ++++- .../unit/kafka/server/RequestQuotaTest.scala | 3 + 18 files changed, 447 insertions(+), 2 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java create mode 100644 clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json create mode 100644 clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index b6a7ec7077e13..17417361efc4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -309,6 +310,25 @@ default ListTopicsResult listTopics() { */ ListTopicsResult listTopics(ListTopicsOptions options); + /** + * List all federated topic znodes + * @return all federated topic znodes, formatted /namespace/topic + */ + default ListFederatedTopicZnodesResult listFederatedTopicZnodes() { + return listFederatedTopicZnodes(Collections.emptyList(), new ListFederatedTopicZnodesOptions()); + } + + /** + * List federated topic znodes match given topic names; if empty list passed, all existing federated topic znodes + * will be listed + * @param federatedTopics topic names + * @param options the options to use when list federated topic znodes + * @return empty list if the given topic names' znode don't exist; otherwise return the federated topics formatted + * /namespace/topic + */ + ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options); + /** * Describe some topics in the cluster, with the default options. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e400ffe9c66a7..f99d67d942f1d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -143,6 +143,7 @@ import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData; import org.apache.kafka.common.message.LiCreateFederatedTopicZnodesRequestData; import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData; import org.apache.kafka.common.message.LiMoveControllerRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -221,6 +222,8 @@ import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckRequest; import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckResponse; +import org.apache.kafka.common.requests.LiListFederatedTopicZnodesRequest; +import org.apache.kafka.common.requests.LiListFederatedTopicZnodesResponse; import org.apache.kafka.common.requests.LiMoveControllerRequest; import org.apache.kafka.common.requests.LiMoveControllerResponse; import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesRequest; @@ -2029,6 +2032,45 @@ void handleFailure(Throwable throwable) { return new ListTopicsResult(topicListingFuture); } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + final KafkaFutureImpl> federatedTopicZnodesListingFuture = new KafkaFutureImpl<>(); + List topicsRequested = new ArrayList<>(); + federatedTopics.forEach(topic -> + topicsRequested.add(new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic))); + final long now = time.milliseconds(); + runnable.call(new Call("listFederatedTopicZnodes", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new LiListFederatedTopicZnodesRequest.Builder( + new LiListFederatedTopicZnodesRequestData().setTopics(topicsRequested), (short) 0 + ); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + LiListFederatedTopicZnodesResponse response = (LiListFederatedTopicZnodesResponse) abstractResponse; + Errors error = Errors.forCode(response.data().errorCode()); + if (error != Errors.NONE) { + federatedTopicZnodesListingFuture.completeExceptionally(error.exception()); + return; + } + + List allFederatedTopicZnodes = new ArrayList<>(response.data().topics()); + federatedTopicZnodesListingFuture.complete(allFederatedTopicZnodes); + } + + @Override + void handleFailure(Throwable throwable) { + federatedTopicZnodesListingFuture.completeExceptionally(throwable); + } + }, now); + return new ListFederatedTopicZnodesResult(federatedTopicZnodesListingFuture); + } + @Override public DescribeTopicsResult describeTopics(final Collection topicNames, DescribeTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicNames.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java new file mode 100644 index 0000000000000..3e8a05315a0da --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesOptions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * Options for {@link Admin#listFederatedTopicZnodes()}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListFederatedTopicZnodesOptions extends AbstractOptions { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java new file mode 100644 index 0000000000000..0f997152e21fe --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListFederatedTopicZnodesResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.List; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * The result of the {@link Admin#listFederatedTopicZnodes()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListFederatedTopicZnodesResult { + final KafkaFuture> future; + + ListFederatedTopicZnodesResult(KafkaFuture> future) { + this.future = future; + } + + /** + * Return a future which yields a list of federated topic names + */ + public KafkaFuture> topics() { + return future; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java index 6e49a12f13bff..57a3acc009bf7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java @@ -68,6 +68,12 @@ public ListTopicsResult listTopics(ListTopicsOptions options) { return null; } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + return null; + } + @Override public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { return null; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a4f09b78ac4b1..8de25faaff839 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -116,7 +116,8 @@ public enum ApiKeys { LI_MOVE_CONTROLLER(ApiMessageType.LI_MOVE_CONTROLLER, true), LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true), - LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true); + LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true), + LI_LIST_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_LIST_FEDERATED_TOPIC_ZNODES); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 856c06f584a74..16f3e1dfbccf8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -313,6 +313,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return LiCreateFederatedTopicZnodesRequest.parse(buffer, apiVersion); case LI_DELETE_FEDERATED_TOPIC_ZNODES: return LiDeleteFederatedTopicZnodesRequest.parse(buffer, apiVersion); + case LI_LIST_FEDERATED_TOPIC_ZNODES: + return LiListFederatedTopicZnodesRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7d0d9bb88438a..0e515450f4309 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -257,6 +257,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return LiCreateFederatedTopicZnodesResponse.parse(responseBuffer, version); case LI_DELETE_FEDERATED_TOPIC_ZNODES: return LiDeleteFederatedTopicZnodesResponse.parse(responseBuffer, version); + case LI_LIST_FEDERATED_TOPIC_ZNODES: + return LiListFederatedTopicZnodesResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java new file mode 100644 index 0000000000000..4794509b837bf --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Collections; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiListFederatedTopicZnodesRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + + private final LiListFederatedTopicZnodesRequestData data; + + public Builder(LiListFederatedTopicZnodesRequestData data, short allowedVersion) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, allowedVersion); + this.data = data; + } + + @Override + public LiListFederatedTopicZnodesRequest build(short version) { + return new LiListFederatedTopicZnodesRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final LiListFederatedTopicZnodesRequestData data; + + public LiListFederatedTopicZnodesRequest(LiListFederatedTopicZnodesRequestData data, short version) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, version); + this.data = data; + } + + @Override + public LiListFederatedTopicZnodesResponse getErrorResponse(int throttleTimeMs, Throwable e) { + LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData(). + setTopics(Collections.emptyList()). + setThrottleTimeMs(throttleTimeMs). + setErrorCode(Errors.forException(e).code()); + return new LiListFederatedTopicZnodesResponse(data, version()); + } + + public static LiListFederatedTopicZnodesRequest parse(ByteBuffer buffer, short version) { + return new LiListFederatedTopicZnodesRequest( + new LiListFederatedTopicZnodesRequestData(new ByteBufferAccessor(buffer), version), version + ); + } + + @Override + public LiListFederatedTopicZnodesRequestData data() { + return data; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java new file mode 100644 index 0000000000000..feb3719f98d4c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiListFederatedTopicZnodesResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiListFederatedTopicZnodesResponse extends AbstractResponse { + private final LiListFederatedTopicZnodesResponseData data; + private final short version; + + public LiListFederatedTopicZnodesResponse(LiListFederatedTopicZnodesResponseData data, short version) { + super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES); + this.data = data; + this.version = version; + } + + @Override + public LiListFederatedTopicZnodesResponseData data() { + return data; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); + } + + public static LiListFederatedTopicZnodesResponse parse(ByteBuffer buffer, short version) { + return new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData(new ByteBufferAccessor(buffer), version), version + ); + } + + public static LiListFederatedTopicZnodesResponse prepareResponse(Errors error, int throttleTimeMs, short version) { + LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData(); + data.setErrorCode(error.code()); + data.setThrottleTimeMs(throttleTimeMs); + return new LiListFederatedTopicZnodesResponse(data, version); + } + + public short version() { + return version; + } + + @Override + public String toString() { + return data.toString(); + } +} diff --git a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json new file mode 100644 index 0000000000000..4ba7709ef6220 --- /dev/null +++ b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1005, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "LiListFederatedTopicZnodesRequest", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The simple name of the federated topics", + "fields": [ + {"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"} + ]} + ] +} diff --git a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json new file mode 100644 index 0000000000000..1683c71a65238 --- /dev/null +++ b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesResponse.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1005, + "type": "response", + "name": "LiListFederatedTopicZnodesResponse", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "type": "[]string", "versions": "0+", "about": "The name and namespace of the federated topic"}, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code." }, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + ] +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 7cbd95d8ad8de..60cd6d83ad581 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -360,6 +360,12 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { return new ListTopicsResult(future); } + @Override + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override synchronized public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { Map> topicDescriptions = new HashMap<>(); diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index cb9ddccf8c29b..9f121a930ed8b 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -100,6 +100,7 @@ object RequestConvertToJson { case req: LiMoveControllerRequest => LiMoveControllerRequestDataJsonConverter.write(req.data, request.version()) case req: LiCreateFederatedTopicZnodesRequest => LiCreateFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) case req: LiDeleteFederatedTopicZnodesRequest => LiDeleteFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) + case req: LiListFederatedTopicZnodesRequest => LiListFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -180,6 +181,7 @@ object RequestConvertToJson { case res: LiMoveControllerResponse => LiMoveControllerResponseDataJsonConverter.write(res.data, version) case res: LiCreateFederatedTopicZnodesResponse => LiCreateFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) case res: LiDeleteFederatedTopicZnodesResponse => LiDeleteFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) + case res: LiListFederatedTopicZnodesResponse => LiListFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 854525381a4b6..35e3a405cf7ea 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -262,6 +262,8 @@ class KafkaApis(val requestChannel: RequestChannel, // LI_DELETE_FEDERATED_TOPIC_ZNODES is decoupled from DELETE_TOPICS and only changes the ACL validation behavior // within kafka-server level case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => maybeForwardToController(request, handleDeleteFederatedTopicZnodesRequest) + // LI_LIST_FEDERATED_TOPIC_ZNODES is decoupled from LIST_TOPICS and not affect any other operation + case ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES => handleListFederatedTopicZnodesRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3813,6 +3815,57 @@ class KafkaApis(val requestChannel: RequestChannel, } } } + + def handleListFederatedTopicZnodesRequest(request: RequestChannel.Request): Unit = { + val listfederatedTopicZnodesRequest = request.body[LiListFederatedTopicZnodesRequest] + val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) + val requestedTopics = listfederatedTopicZnodesRequest.data().topics() + + // only do authorization on cluster level, users are not expected to track this directly via kafka + if (!authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + LiListFederatedTopicZnodesResponse.prepareResponse( + Errors.CLUSTER_AUTHORIZATION_FAILED, requestThrottleMs, listfederatedTopicZnodesRequest.version() + ) + ) + return + } + + try { + if (requestedTopics.isEmpty) { + // if empty list passed, list all existing federated topics + val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics.toList.asJava + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData().setTopics(allFederatedTopicZnodes) + .setThrottleTimeMs(requestThrottleMs), listfederatedTopicZnodesRequest.version() + ) + ) + } else { + // if non-empty list passed, only list znode values for the given topics + val foundFederatedTopicZnodes = mutable.Set[String]() + + requestedTopics.forEach(topic => { + val curFederatedTopicZnode = zkSupport.zkClient.getFederatedTopic(topic.name()) + if (curFederatedTopicZnode != null) { + foundFederatedTopicZnodes.add(curFederatedTopicZnode) + } + }) + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + new LiListFederatedTopicZnodesResponse( + new LiListFederatedTopicZnodesResponseData() + .setTopics(foundFederatedTopicZnodes.toList.asJava).setThrottleTimeMs(requestThrottleMs), + listfederatedTopicZnodesRequest.version() + ) + ) + } + } catch { + case throwable: Throwable => + requestHelper.sendResponseExemptThrottle(request, listfederatedTopicZnodesRequest.getErrorResponse(throwable)) + } + } } object KafkaApis { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 837fc0515eaa5..6f95bb0e50f27 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -145,6 +145,16 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, }.toMap } + def getFederatedTopic(topic: String): String = { + val getDataRequest = GetDataRequest(FederatedTopicZnode.path(topic)) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + getDataResponse.resultCode match { + case Code.OK => "/" + FederatedTopicZnode.decode(getDataResponse.data) + "/" + topic + case Code.NONODE => null + case _ => throw getDataResponse.resultException.get + } + } + def getAllFederatedTopics: Set[String] = { val topics = getChildren(FederatedTopicsZNode.path) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 9aaba26844df3..4117206522c1f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -136,7 +136,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testFederatedTopicCreateDeleteAndGet(): Unit = { client = Admin.create(createConfig) val federatedTopic = Map("federated-test-topic" -> "tracking").asJava - val expectedFedTopic = Seq("/tracking/federated-test-topic") + val expectedFedTopicString = "/tracking/federated-test-topic" + val expectedFedTopic = Seq(expectedFedTopicString) // create one federated topic client.createFederatedTopicZnodes(federatedTopic) // since creation is async, wait for federated topic znodes to be created @@ -146,6 +147,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val federatedTopics = zkClient.getAllFederatedTopics assertEquals(1, federatedTopics.size) + // test list federated topic znodes api + // 1. test list all + val allZnodes = client.listFederatedTopicZnodes().topics().get() + assertEquals(1, allZnodes.size()) + assertEquals(expectedFedTopicString, allZnodes.get(0)) + // 2. test list specific success + val expectedSuccess = client.listFederatedTopicZnodes(Collections.singletonList("federated-test-topic"), + new ListFederatedTopicZnodesOptions()).topics().get() + assertEquals(1, expectedSuccess.size()) + assertEquals(expectedFedTopicString, expectedSuccess.get(0)) + // 3. test list specific fail + val expectedFail = client.listFederatedTopicZnodes(Collections.singletonList("non-exist-topic"), + new ListFederatedTopicZnodesOptions()).topics().get() + assertEquals(0, expectedFail.size()) + // delete federated topic znode client.deleteFederatedTopicZnodes(federatedTopic) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index e0ced640ee1b9..2c1f33bb23e2d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -667,6 +667,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => new LiDeleteFederatedTopicZnodesRequest.Builder(new LiDeleteFederatedTopicZnodesRequestData(), ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES.latestVersion) + case ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES => + new LiListFederatedTopicZnodesRequest.Builder(new LiListFederatedTopicZnodesRequestData(), ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES.latestVersion) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) }