From a1ca4020be62e5f27601ba9c06a84fb376bb48c3 Mon Sep 17 00:00:00 2001 From: Ke Hu Date: Mon, 30 Oct 2023 14:44:10 -0700 Subject: [PATCH 1/2] [LI-HOTFIX] Add federated topic znode delete rpc (#481) Add federated topic znode delete rpc LI_DESCRIPTION = In order to make kafka broker understand new acl, it will need to understand the topic name and its corresponding namespace. So when a new kafka topic is deleted by xmd-service, the corresponding znode will also be deleted in the above new znode directory. Following rpcs are added: 1. delete federated topic rpc that deletes the znode entry under /federatedTopics/topicName (value = namespace name) 2. zk function to get all federated topics as a set of string, in the format of /namespaceName/topicName for easier usage and avoid duplicate code in kafka-server Added integ-test to verify create and delete logic EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl. --- .../org/apache/kafka/clients/admin/Admin.java | 19 +++++ ...ateOrDeleteFederatedTopicZnodesResult.java | 2 +- .../DeleteFederatedTopicZnodesOptions.java | 58 ++++++++++++++ .../kafka/clients/admin/KafkaAdminClient.java | 42 ++++++++++ .../kafka/clients/admin/NoOpAdminClient.java | 6 ++ .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../LiDeleteFederatedTopicZnodesRequest.java | 70 +++++++++++++++++ .../LiDeleteFederatedTopicZnodesResponse.java | 77 +++++++++++++++++++ .../LiDeleteFederatedTopicZnodesRequest.json | 32 ++++++++ .../LiDeleteFederatedTopicZnodesResponse.json | 27 +++++++ .../kafka/clients/admin/MockAdminClient.java | 5 ++ .../kafka/network/RequestConvertToJson.scala | 2 + .../main/scala/kafka/server/KafkaApis.scala | 71 ++++++++++++++++- .../main/scala/kafka/zk/KafkaZkClient.scala | 26 +++++++ .../kafka/api/BaseAdminIntegrationTest.scala | 9 +++ .../api/PlaintextAdminIntegrationTest.scala | 19 ++++- .../unit/kafka/server/RequestQuotaTest.scala | 3 + 19 files changed, 470 insertions(+), 5 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DeleteFederatedTopicZnodesOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesResponse.java create mode 100644 clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesRequest.json create mode 100644 clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesResponse.json diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index cda3af2bccf80..b6a7ec7077e13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -199,6 +199,13 @@ default CreateTopicsResult createTopics(Collection newTopics) { */ CreateTopicsResult createTopics(Collection newTopics, CreateTopicsOptions options); + /** + * Create federated topic znodes, with znode name as the topic name and data as namespace name. + * This operation is decoupled from createTopics and will eventually only change + * the ACL validation behavior within kafka-server level + * @param federatedTopics map of topic name to namespace name for federated topics + * @return The CreateOrDeleteFederatedTopicZnodesResult + */ default CreateOrDeleteFederatedTopicZnodesResult createFederatedTopicZnodes(Map federatedTopics) { return createFederatedTopicZnodes(federatedTopics, new CreateFederatedTopicZnodesOptions()); } @@ -270,6 +277,18 @@ default DeleteTopicsResult deleteTopics(TopicCollection topics) { */ DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options); + /** + * Delete federated topic znodes. This operation is decoupled from deleteTopics and will eventually only change + * the ACL validation behavior within kafka-server level + * @param federatedTopics map of topic name to namespace name for federated topics + * @return The CreateOrDeleteFederatedTopicZnodesResult + */ + default CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map federatedTopics) { + return deleteFederatedTopicZnodes(federatedTopics, new DeleteFederatedTopicZnodesOptions()); + } + + CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map federatedTopics, DeleteFederatedTopicZnodesOptions options); + /** * List the topics available in the cluster with the default options. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateOrDeleteFederatedTopicZnodesResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateOrDeleteFederatedTopicZnodesResult.java index 9c7a4d6a090ac..7932614e6721d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateOrDeleteFederatedTopicZnodesResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateOrDeleteFederatedTopicZnodesResult.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; /** - * The result of {@link Admin#CreateFederatedTopicZnode()} or {@link Admin#DeleteFederatedTopicZnode()}. + * The result of {@link Admin#CreateFederatedTopicZnodes()} or {@link Admin#DeleteFederatedTopicZnodes()}. * * The API of this class is evolving, see {@link Admin} for details. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteFederatedTopicZnodesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteFederatedTopicZnodesOptions.java new file mode 100644 index 0000000000000..d9b58e95045aa --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteFederatedTopicZnodesOptions.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Map; +import org.apache.kafka.common.annotation.InterfaceStability; + + +/** + * Options for {@link Admin#deleteFederatedTopicZnodes(Map, DeleteFederatedTopicZnodesOptions)} (Collection)}. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DeleteFederatedTopicZnodesOptions extends AbstractOptions { + private boolean retryOnQuotaViolation = true; + + /** + * Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the + * AdminClient should be used. + * + */ + // This method is retained to keep binary compatibility with 0.11 + public DeleteFederatedTopicZnodesOptions timeoutMs(Integer timeoutMs) { + this.timeoutMs = timeoutMs; + return this; + } + + /** + * Set to true if quota violation should be automatically retried. + */ + public DeleteFederatedTopicZnodesOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) { + this.retryOnQuotaViolation = retryOnQuotaViolation; + return this; + } + + /** + * Returns true if quota violation should be automatically retried. + */ + public boolean shouldRetryOnQuotaViolation() { + return retryOnQuotaViolation; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 6fd895d420142..e400ffe9c66a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -142,6 +142,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData; import org.apache.kafka.common.message.LiCreateFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData; import org.apache.kafka.common.message.LiMoveControllerRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -224,6 +225,8 @@ import org.apache.kafka.common.requests.LiMoveControllerResponse; import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesRequest; import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesResponse; +import org.apache.kafka.common.requests.LiDeleteFederatedTopicZnodesRequest; +import org.apache.kafka.common.requests.LiDeleteFederatedTopicZnodesResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -1750,6 +1753,45 @@ else if (topics instanceof TopicNameCollection) throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for deleteTopics."); } + @Override + public CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map federatedTopics, + DeleteFederatedTopicZnodesOptions options) { + final Map> topicFutures = new HashMap<>(federatedTopics.size()); + final long now = time.milliseconds(); + List topics = new ArrayList<>(); + federatedTopics.forEach((topic, namespace) -> { + topics.add(new LiDeleteFederatedTopicZnodesRequestData.FederatedTopics().setName(topic).setNamespace(namespace)); + topicFutures.put(topic, new KafkaFutureImpl<>()); + }); + runnable.call(new Call("deleteFederatedTopicsZnode", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new LiDeleteFederatedTopicZnodesRequest.Builder(new LiDeleteFederatedTopicZnodesRequestData() + .setTopics(topics) + .setTimeoutMs(timeoutMs), (short) 0); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + LiDeleteFederatedTopicZnodesResponse response = (LiDeleteFederatedTopicZnodesResponse) abstractResponse; + Errors errors = Errors.forCode(response.data().errorCode()); + if (errors != Errors.NONE) { + completeAllExceptionally(topicFutures.values(), errors.exception()); + return; + } + topicFutures.values().forEach(f -> f.complete(null)); + } + + @Override + void handleFailure(Throwable throwable) { + // Fail all the other remaining futures + completeAllExceptionally(topicFutures.values(), throwable); + } + }, now); + return new CreateOrDeleteFederatedTopicZnodesResult(new HashMap<>(topicFutures)); + } + private Map> handleDeleteTopicsUsingNames(final Collection topicNames, final DeleteTopicsOptions options) { final Map> topicFutures = new HashMap<>(topicNames.size()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java index da4d0d50ece66..6e49a12f13bff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java @@ -57,6 +57,12 @@ public DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptio return null; } + @Override + public CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map federatedTopics, + DeleteFederatedTopicZnodesOptions options) { + return null; + } + @Override public ListTopicsResult listTopics(ListTopicsOptions options) { return null; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6153be0d6b0d0..a4f09b78ac4b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -115,7 +115,8 @@ public enum ApiKeys { LI_COMBINED_CONTROL(ApiMessageType.LI_COMBINED_CONTROL, true), LI_MOVE_CONTROLLER(ApiMessageType.LI_MOVE_CONTROLLER, true), - LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true); + LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true), + LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 02f5577b38eb3..856c06f584a74 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -311,6 +311,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return LiMoveControllerRequest.parse(buffer, apiVersion); case LI_CREATE_FEDERATED_TOPIC_ZNODES: return LiCreateFederatedTopicZnodesRequest.parse(buffer, apiVersion); + case LI_DELETE_FEDERATED_TOPIC_ZNODES: + return LiDeleteFederatedTopicZnodesRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 30935e0a0b995..7d0d9bb88438a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -255,6 +255,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return LiMoveControllerResponse.parse(responseBuffer, version); case LI_CREATE_FEDERATED_TOPIC_ZNODES: return LiCreateFederatedTopicZnodesResponse.parse(responseBuffer, version); + case LI_DELETE_FEDERATED_TOPIC_ZNODES: + return LiDeleteFederatedTopicZnodesResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesRequest.java new file mode 100644 index 0000000000000..9d81c24807dcf --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData; +import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiDeleteFederatedTopicZnodesRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final LiDeleteFederatedTopicZnodesRequestData data; + + public Builder(LiDeleteFederatedTopicZnodesRequestData data, short allowedVersion) { + super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES, allowedVersion); + this.data = data; + } + + @Override + public LiDeleteFederatedTopicZnodesRequest build(short version) { + return new LiDeleteFederatedTopicZnodesRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final LiDeleteFederatedTopicZnodesRequestData data; + + LiDeleteFederatedTopicZnodesRequest(LiDeleteFederatedTopicZnodesRequestData data, short version) { + super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES, version); + this.data = data; + } + + public static LiDeleteFederatedTopicZnodesRequest parse(ByteBuffer buffer, short version) { + return new LiDeleteFederatedTopicZnodesRequest(new LiDeleteFederatedTopicZnodesRequestData(new ByteBufferAccessor(buffer), version), version); + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + LiDeleteFederatedTopicZnodesResponseData data = new LiDeleteFederatedTopicZnodesResponseData() + .setErrorCode(Errors.forException(e).code()); + return new LiDeleteFederatedTopicZnodesResponse(data, version()); + } + + @Override + public LiDeleteFederatedTopicZnodesRequestData data() { + return data; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesResponse.java new file mode 100644 index 0000000000000..c3c69190514fe --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiDeleteFederatedTopicZnodesResponse.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + + +public class LiDeleteFederatedTopicZnodesResponse extends AbstractResponse { + private final LiDeleteFederatedTopicZnodesResponseData data; + private final short version; + + public LiDeleteFederatedTopicZnodesResponse(LiDeleteFederatedTopicZnodesResponseData data, short version) { + super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES); + this.data = data; + this.version = version; + } + + public Errors error() { + return Errors.forCode(data.errorCode()); + } + + @Override + public Map errorCounts() { + return Collections.singletonMap(error(), 1); + } + + @Override + public LiDeleteFederatedTopicZnodesResponseData data() { + return data; + } + + public static LiDeleteFederatedTopicZnodesResponse prepareResponse(Errors error, int throttleTimeMs, short version) { + LiDeleteFederatedTopicZnodesResponseData data = new LiDeleteFederatedTopicZnodesResponseData(); + data.setErrorCode(error.code()); + data.setThrottleTimeMs(throttleTimeMs); + return new LiDeleteFederatedTopicZnodesResponse(data, version); + } + + public static LiDeleteFederatedTopicZnodesResponse parse(ByteBuffer buffer, short version) { + return new LiDeleteFederatedTopicZnodesResponse(new LiDeleteFederatedTopicZnodesResponseData(new ByteBufferAccessor(buffer), version), version); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + public short version() { + return version; + } + + @Override + public String toString() { + return data.toString(); + } +} diff --git a/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesRequest.json b/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesRequest.json new file mode 100644 index 0000000000000..4571a0f6ee4f4 --- /dev/null +++ b/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesRequest.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1004, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "LiDeleteFederatedTopicZnodesRequest", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The name and namespace of the federated topic", + "fields": [ + {"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"}, + {"name": "Namespace", "type": "string", "versions": "0+", "about": "The namespace of the topic"} + ]}, + { "name": "TimeoutMs", "type": "int32", "versions": "0+", + "about": "The length of time in milliseconds to wait for the deletions to complete." } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesResponse.json b/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesResponse.json new file mode 100644 index 0000000000000..2594f10f8b22c --- /dev/null +++ b/clients/src/main/resources/common/message/LiDeleteFederatedTopicZnodesResponse.json @@ -0,0 +1,27 @@ +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 1004, + "type": "response", + "name": "LiDeleteFederatedTopicZnodesResponse", + "validVersions": "0-1", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code." }, + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." } + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 986f7e17619fc..7cbd95d8ad8de 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -413,6 +413,11 @@ else if (topics instanceof TopicNameCollection) return result; } + @Override + public CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map federatedTopics, DeleteFederatedTopicZnodesOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + private Map> handleDeleteTopicsUsingNames(Collection topicNameCollection, DeleteTopicsOptions options) { Map> deleteTopicsResult = new HashMap<>(); Collection topicNames = new ArrayList<>(topicNameCollection); diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 568ca97ed1e37..cb9ddccf8c29b 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -99,6 +99,7 @@ object RequestConvertToJson { case req: LiCombinedControlRequest => LiCombinedControlRequestDataJsonConverter.write(req.data, request.version()) case req: LiMoveControllerRequest => LiMoveControllerRequestDataJsonConverter.write(req.data, request.version()) case req: LiCreateFederatedTopicZnodesRequest => LiCreateFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) + case req: LiDeleteFederatedTopicZnodesRequest => LiDeleteFederatedTopicZnodesRequestDataJsonConverter.write(req.data, request.version()) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -178,6 +179,7 @@ object RequestConvertToJson { case res: LiCombinedControlResponse => LiCombinedControlResponseDataJsonConverter.write(res.data, version) case res: LiMoveControllerResponse => LiMoveControllerResponseDataJsonConverter.write(res.data, version) case res: LiCreateFederatedTopicZnodesResponse => LiCreateFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) + case res: LiDeleteFederatedTopicZnodesResponse => LiDeleteFederatedTopicZnodesResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c89339c027cd5..854525381a4b6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -256,7 +256,12 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LI_CONTROLLED_SHUTDOWN_SKIP_SAFETY_CHECK => handleLiControlledShutdownSkipSafetyCheck(request) case ApiKeys.LI_COMBINED_CONTROL => handleLiCombinedControlRequest(request, requestLocal) case ApiKeys.LI_MOVE_CONTROLLER => handleMoveControllerRequest(request) + // LI_CREATE_FEDERATED_TOPIC_ZNODES is decoupled from CREATE_TOPICS and only changes the ACL validation behavior + // within kafka-server level case ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES => maybeForwardToController(request, handleMarkFederatedTopicRequest) + // LI_DELETE_FEDERATED_TOPIC_ZNODES is decoupled from DELETE_TOPICS and only changes the ACL validation behavior + // within kafka-server level + case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => maybeForwardToController(request, handleDeleteFederatedTopicZnodesRequest) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3732,7 +3737,7 @@ class KafkaApis(val requestChannel: RequestChannel, } try { - toCreate.foreach( entry => { + toCreate.foreach(entry => { zkSupport.zkClient.createFederatedTopicZNode(entry._1, entry._2) }) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => @@ -3744,6 +3749,70 @@ class KafkaApis(val requestChannel: RequestChannel, } } } + + + def handleDeleteFederatedTopicZnodesRequest(request: RequestChannel.Request): Unit = { + val federatedTopicZnodesDeleteRequest = request.body[LiDeleteFederatedTopicZnodesRequest] + val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) + + if (!zkSupport.controller.isActive) { + requestHelper.sendResponseExemptThrottle(request, + LiDeleteFederatedTopicZnodesResponse.prepareResponse(Errors.NOT_CONTROLLER, 0, federatedTopicZnodesDeleteRequest.version()) + ) + } else if (!zkSupport.controller.topicDeletionManager.isDeleteTopicEnabled) { + val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED + requestHelper.sendResponseExemptThrottle(request, + LiDeleteFederatedTopicZnodesResponse.prepareResponse(error, 0, federatedTopicZnodesDeleteRequest.version()) + ) + } else { + val allTopics = mutable.Set[String]() + federatedTopicZnodesDeleteRequest.data().topics().forEach(federatedTopic => { + allTopics += federatedTopic.name() + }) + val results = new DeletableTopicResultCollection(allTopics.size) + val toDelete = mutable.Set[String]() + + allTopics.foreach(topic => { + results.add(new DeletableTopicResult().setName(topic)) + }) + + val authorizedDescribeTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) + val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, + results.asScala.filter(result => result.name() != null))(_.name) + results.forEach { topic => + val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null + if (unresolvedTopicId) { + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + } else if (!authorizedDescribeTopics.contains(topic.name)) { + + // Because the client does not have Describe permission, the name should + // not be returned in the response. Note, however, that we do not consider + // the topicId itself to be sensitive, so there is no reason to obscure + // this case with `UNKNOWN_TOPIC_ID`. + topic.setName(null) + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else if (!authorizedDeleteTopics.contains(topic.name)) { + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else { + toDelete += topic.name + } + } + + try { + toDelete.foreach(federatedTopic => { + zkSupport.zkClient.deleteFederatedTopicZNode(federatedTopic) + }) + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + LiDeleteFederatedTopicZnodesResponse.prepareResponse(Errors.NONE, requestThrottleMs, + federatedTopicZnodesDeleteRequest.version()) + ) + } catch { + case throwable: Throwable => + requestHelper.sendResponseExemptThrottle(request, federatedTopicZnodesDeleteRequest.getErrorResponse(throwable)) + } + } + } } object KafkaApis { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index dcc133acafd70..837fc0515eaa5 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -145,6 +145,28 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, }.toMap } + def getAllFederatedTopics: Set[String] = { + val topics = getChildren(FederatedTopicsZNode.path) + + val merge: ((String, String)) => String = { + case (key, value) => "/" + value + "/" + key + } + + val getDataRequests = topics.map(topic => GetDataRequest( + FederatedTopicZnode.path(topic), + ctx = Some(topic))) + + val getDataResponses = retryRequestsUntilConnected(getDataRequests) + getDataResponses.flatMap { getDataResponse => + val topic = getDataResponse.ctx.get.asInstanceOf[String] + getDataResponse.resultCode match { + case Code.OK => Some(topic, FederatedTopicZnode.decode(getDataResponse.data)) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get + } + }.toMap.map(merge) + }.toSet + /** * Registers a given broker in zookeeper as the controller and increments controller epoch. * @param controllerId the id of the broker that is to be registered as the controller. @@ -1868,6 +1890,10 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, createRecursive(path, FederatedTopicZnode.encode(namespace)) } + def deleteFederatedTopicZNode(topic: String): Unit = { + deletePath(FederatedTopicZnode.path(topic), ZkVersion.MatchAnyVersion, false) + } + private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = { val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition), ConsumerOffset.encode(offset), ZkVersion.MatchAnyVersion) diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index 091b5da690a4d..ef5a2093a3a92 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -23,6 +23,7 @@ import kafka.security.authorizer.AclEntry import kafka.server.KafkaConfig import kafka.utils.Logging import kafka.utils.TestUtils._ +import kafka.zk.KafkaZkClient import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateTopicsOptions, CreateTopicsResult, DescribeClusterOptions, DescribeTopicsOptions, NewTopic, TopicDescription} import org.apache.kafka.common.Uuid import org.apache.kafka.common.acl.AclOperation @@ -222,6 +223,14 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg }, "timed out waiting for topics") } + def waitForFederatedTopicZnodes(client: KafkaZkClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { + waitUntilTrue(() => { + val topics = client.getAllFederatedTopics + expectedPresent.forall(topicName => topics.contains(topicName)) && + expectedMissing.forall(topicName => !topics.contains(topicName)) + }, "timed out waiting for federated topics") + } + def getTopicMetadata(client: Admin, topic: String, describeOptions: DescribeTopicsOptions = new DescribeTopicsOptions, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 406b086e4b07d..9aaba26844df3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -136,9 +136,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testFederatedTopicCreateDeleteAndGet(): Unit = { client = Admin.create(createConfig) val federatedTopic = Map("federated-test-topic" -> "tracking").asJava - - // create the federated topic znode + val expectedFedTopic = Seq("/tracking/federated-test-topic") + // create one federated topic client.createFederatedTopicZnodes(federatedTopic) + // since creation is async, wait for federated topic znodes to be created + waitForFederatedTopicZnodes(zkClient, expectedFedTopic, List()) + + // check federated topic is created + val federatedTopics = zkClient.getAllFederatedTopics + assertEquals(1, federatedTopics.size) + + // delete federated topic znode + client.deleteFederatedTopicZnodes(federatedTopic) + + waitForFederatedTopicZnodes(zkClient, List(), expectedFedTopic) + + // after deletion, federated topic znodes should be empty + val result1 = zkClient.getAllFederatedTopics + assertEquals(0, result1.size) } @Test diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index e3ebf549d15cd..e0ced640ee1b9 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -664,6 +664,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES => new LiCreateFederatedTopicZnodesRequest.Builder(new LiCreateFederatedTopicZnodesRequestData(), ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES.latestVersion) + case ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES => + new LiDeleteFederatedTopicZnodesRequest.Builder(new LiDeleteFederatedTopicZnodesRequestData(), ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES.latestVersion) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } From 3ff58711cf3578995e6d48d3ec2e933fc48f2a91 Mon Sep 17 00:00:00 2001 From: "Joseph (Ting-Chou) Lin" Date: Mon, 30 Oct 2023 15:36:24 -0700 Subject: [PATCH 2/2] [LI-CHERRY-PICK] KAFKA-13457: SocketChannel in Acceptor#accept is not closed upon IOException (#11504) (#486) This patch ensures that SocketChannel in Acceptor#accept is closed if an IOException is thrown while the socket is configured. Reviewers: Luke Chen , David Jacot Co-authored-by: Haoze Wu <18595686+functioner@users.noreply.github.com> --- .../scala/kafka/network/SocketServer.scala | 22 ++++++++---- .../unit/kafka/network/SocketServerTest.scala | 34 +++++++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 42f454b4842a8..690ab892aee22 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -84,7 +84,7 @@ class SocketServer(val config: KafkaConfig, private val maxQueuedRequests = config.queuedMaxRequests - private val nodeId = config.brokerId + protected val nodeId = config.brokerId private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ") @@ -291,7 +291,7 @@ class SocketServer(val config: KafkaConfig, } } - private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { + protected def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) @@ -726,11 +726,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter) - socketChannel.configureBlocking(false) - socketChannel.socket().setTcpNoDelay(true) - socketChannel.socket().setKeepAlive(true) - if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) - socketChannel.socket().setSendBufferSize(sendBufferSize) + configureAcceptedSocketChannel(socketChannel) Some(socketChannel) } catch { case e: TooManyConnectionsException => @@ -743,9 +739,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint, val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs) None + case e: IOException => + error(s"Encountered an error while configuring the connection, closing it.", e) + close(endPoint.listenerName, socketChannel) + None } } + protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = { + socketChannel.configureBlocking(false) + socketChannel.socket().setTcpNoDelay(true) + socketChannel.socket().setKeepAlive(true) + if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + socketChannel.socket().setSendBufferSize(sendBufferSize) + } + /** * Close sockets for any connections that have been throttled. */ diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6e5daebb984bd..1183a31fde513 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode, TextNod import com.yammer.metrics.core.{Gauge, Meter} import javax.net.ssl._ +import kafka.cluster.EndPoint import kafka.metrics.KafkaYammerMetrics import kafka.security.CredentialProvider import kafka.server.{KafkaConfig, Observer, SimpleApiVersionManager, ThrottleCallback, ThrottledChannel} @@ -873,6 +874,39 @@ class SocketServerTest { } } + @Test + def testExceptionInAcceptor(): Unit = { + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val serverMetrics = new Metrics() + + val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, + Time.SYSTEM, credentialProvider, observer, apiVersionManager) { + + // same as SocketServer.createAcceptor, + // except the Acceptor overriding a method to inject the exception + override protected def createAcceptor(endPoint: EndPoint, metricPrefix: String): Acceptor = { + val sendBufferSize = config.socketSendBufferBytes + val recvBufferSize = config.socketReceiveBufferBytes + new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) { + override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = { + assertEquals(1, connectionQuotas.get(socketChannel.socket.getInetAddress)) + throw new IOException("test injected IOException") + } + } + } + } + + try { + overrideServer.startup() + val conn = connect(overrideServer) + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream.read()) + assertEquals(0, overrideServer.connectionQuotas.get(conn.getInetAddress)) + } finally { + shutdownServerAndMetrics(overrideServer) + } + } + @Test def testConnectionRatePerIp(): Unit = { val defaultTimeoutMs = 2000