Skip to content

Commit

Permalink
Merge branch '3.0-li' into tlin-patch5
Browse files Browse the repository at this point in the history
  • Loading branch information
lmr3796 authored Oct 30, 2023
2 parents f93d2a5 + 3ff5871 commit 859b9c7
Show file tree
Hide file tree
Showing 21 changed files with 519 additions and 12 deletions.
19 changes: 19 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ default CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
*/
CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);

/**
* Create federated topic znodes, with znode name as the topic name and data as namespace name.
* This operation is decoupled from createTopics and will eventually only change
* the ACL validation behavior within kafka-server level
* @param federatedTopics map of topic name to namespace name for federated topics
* @return The CreateOrDeleteFederatedTopicZnodesResult
*/
default CreateOrDeleteFederatedTopicZnodesResult createFederatedTopicZnodes(Map<String, String> federatedTopics) {
return createFederatedTopicZnodes(federatedTopics, new CreateFederatedTopicZnodesOptions());
}
Expand Down Expand Up @@ -270,6 +277,18 @@ default DeleteTopicsResult deleteTopics(TopicCollection topics) {
*/
DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options);

/**
* Delete federated topic znodes. This operation is decoupled from deleteTopics and will eventually only change
* the ACL validation behavior within kafka-server level
* @param federatedTopics map of topic name to namespace name for federated topics
* @return The CreateOrDeleteFederatedTopicZnodesResult
*/
default CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map<String, String> federatedTopics) {
return deleteFederatedTopicZnodes(federatedTopics, new DeleteFederatedTopicZnodesOptions());
}

CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map<String, String> federatedTopics, DeleteFederatedTopicZnodesOptions options);

/**
* List the topics available in the cluster with the default options.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The result of {@link Admin#CreateFederatedTopicZnode()} or {@link Admin#DeleteFederatedTopicZnode()}.
* The result of {@link Admin#CreateFederatedTopicZnodes()} or {@link Admin#DeleteFederatedTopicZnodes()}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import java.util.Map;
import org.apache.kafka.common.annotation.InterfaceStability;


/**
* Options for {@link Admin#deleteFederatedTopicZnodes(Map, DeleteFederatedTopicZnodesOptions)} (Collection)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteFederatedTopicZnodesOptions extends AbstractOptions<DeleteFederatedTopicZnodesOptions> {
private boolean retryOnQuotaViolation = true;

/**
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
* AdminClient should be used.
*
*/
// This method is retained to keep binary compatibility with 0.11
public DeleteFederatedTopicZnodesOptions timeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
return this;
}

/**
* Set to true if quota violation should be automatically retried.
*/
public DeleteFederatedTopicZnodesOptions retryOnQuotaViolation(boolean retryOnQuotaViolation) {
this.retryOnQuotaViolation = retryOnQuotaViolation;
return this;
}

/**
* Returns true if quota violation should be automatically retried.
*/
public boolean shouldRetryOnQuotaViolation() {
return retryOnQuotaViolation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData;
import org.apache.kafka.common.message.LiCreateFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiMoveControllerRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
Expand Down Expand Up @@ -224,6 +225,8 @@
import org.apache.kafka.common.requests.LiMoveControllerResponse;
import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesRequest;
import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesResponse;
import org.apache.kafka.common.requests.LiDeleteFederatedTopicZnodesRequest;
import org.apache.kafka.common.requests.LiDeleteFederatedTopicZnodesResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
Expand Down Expand Up @@ -1750,6 +1753,45 @@ else if (topics instanceof TopicNameCollection)
throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for deleteTopics.");
}

@Override
public CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map<String, String> federatedTopics,
DeleteFederatedTopicZnodesOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(federatedTopics.size());
final long now = time.milliseconds();
List<LiDeleteFederatedTopicZnodesRequestData.FederatedTopics> topics = new ArrayList<>();
federatedTopics.forEach((topic, namespace) -> {
topics.add(new LiDeleteFederatedTopicZnodesRequestData.FederatedTopics().setName(topic).setNamespace(namespace));
topicFutures.put(topic, new KafkaFutureImpl<>());
});
runnable.call(new Call("deleteFederatedTopicsZnode", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
AbstractRequest.Builder<?> createRequest(int timeoutMs) {
return new LiDeleteFederatedTopicZnodesRequest.Builder(new LiDeleteFederatedTopicZnodesRequestData()
.setTopics(topics)
.setTimeoutMs(timeoutMs), (short) 0);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
LiDeleteFederatedTopicZnodesResponse response = (LiDeleteFederatedTopicZnodesResponse) abstractResponse;
Errors errors = Errors.forCode(response.data().errorCode());
if (errors != Errors.NONE) {
completeAllExceptionally(topicFutures.values(), errors.exception());
return;
}
topicFutures.values().forEach(f -> f.complete(null));
}

@Override
void handleFailure(Throwable throwable) {
// Fail all the other remaining futures
completeAllExceptionally(topicFutures.values(), throwable);
}
}, now);
return new CreateOrDeleteFederatedTopicZnodesResult(new HashMap<>(topicFutures));
}

private Map<String, KafkaFuture<Void>> handleDeleteTopicsUsingNames(final Collection<String> topicNames,
final DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptio
return null;
}

@Override
public CreateOrDeleteFederatedTopicZnodesResult deleteFederatedTopicZnodes(Map<String, String> federatedTopics,
DeleteFederatedTopicZnodesOptions options) {
return null;
}

@Override
public ListTopicsResult listTopics(ListTopicsOptions options) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public enum ApiKeys {
LI_COMBINED_CONTROL(ApiMessageType.LI_COMBINED_CONTROL, true),
LI_MOVE_CONTROLLER(ApiMessageType.LI_MOVE_CONTROLLER, true),

LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true);
LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true),
LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return LiMoveControllerRequest.parse(buffer, apiVersion);
case LI_CREATE_FEDERATED_TOPIC_ZNODES:
return LiCreateFederatedTopicZnodesRequest.parse(buffer, apiVersion);
case LI_DELETE_FEDERATED_TOPIC_ZNODES:
return LiDeleteFederatedTopicZnodesRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return LiMoveControllerResponse.parse(responseBuffer, version);
case LI_CREATE_FEDERATED_TOPIC_ZNODES:
return LiCreateFederatedTopicZnodesResponse.parse(responseBuffer, version);
case LI_DELETE_FEDERATED_TOPIC_ZNODES:
return LiDeleteFederatedTopicZnodesResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiDeleteFederatedTopicZnodesRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<LiDeleteFederatedTopicZnodesRequest> {
private final LiDeleteFederatedTopicZnodesRequestData data;

public Builder(LiDeleteFederatedTopicZnodesRequestData data, short allowedVersion) {
super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES, allowedVersion);
this.data = data;
}

@Override
public LiDeleteFederatedTopicZnodesRequest build(short version) {
return new LiDeleteFederatedTopicZnodesRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final LiDeleteFederatedTopicZnodesRequestData data;

LiDeleteFederatedTopicZnodesRequest(LiDeleteFederatedTopicZnodesRequestData data, short version) {
super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES, version);
this.data = data;
}

public static LiDeleteFederatedTopicZnodesRequest parse(ByteBuffer buffer, short version) {
return new LiDeleteFederatedTopicZnodesRequest(new LiDeleteFederatedTopicZnodesRequestData(new ByteBufferAccessor(buffer), version), version);
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LiDeleteFederatedTopicZnodesResponseData data = new LiDeleteFederatedTopicZnodesResponseData()
.setErrorCode(Errors.forException(e).code());
return new LiDeleteFederatedTopicZnodesResponse(data, version());
}

@Override
public LiDeleteFederatedTopicZnodesRequestData data() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiDeleteFederatedTopicZnodesResponse extends AbstractResponse {
private final LiDeleteFederatedTopicZnodesResponseData data;
private final short version;

public LiDeleteFederatedTopicZnodesResponse(LiDeleteFederatedTopicZnodesResponseData data, short version) {
super(ApiKeys.LI_DELETE_FEDERATED_TOPIC_ZNODES);
this.data = data;
this.version = version;
}

public Errors error() {
return Errors.forCode(data.errorCode());
}

@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(error(), 1);
}

@Override
public LiDeleteFederatedTopicZnodesResponseData data() {
return data;
}

public static LiDeleteFederatedTopicZnodesResponse prepareResponse(Errors error, int throttleTimeMs, short version) {
LiDeleteFederatedTopicZnodesResponseData data = new LiDeleteFederatedTopicZnodesResponseData();
data.setErrorCode(error.code());
data.setThrottleTimeMs(throttleTimeMs);
return new LiDeleteFederatedTopicZnodesResponse(data, version);
}

public static LiDeleteFederatedTopicZnodesResponse parse(ByteBuffer buffer, short version) {
return new LiDeleteFederatedTopicZnodesResponse(new LiDeleteFederatedTopicZnodesResponseData(new ByteBufferAccessor(buffer), version), version);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

public short version() {
return version;
}

@Override
public String toString() {
return data.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 1004,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "LiDeleteFederatedTopicZnodesRequest",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The name and namespace of the federated topic",
"fields": [
{"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"},
{"name": "Namespace", "type": "string", "versions": "0+", "about": "The namespace of the topic"}
]},
{ "name": "TimeoutMs", "type": "int32", "versions": "0+",
"about": "The length of time in milliseconds to wait for the deletions to complete." }
]
}
Loading

0 comments on commit 859b9c7

Please sign in to comment.