Skip to content

Commit

Permalink
Add list federated topic znodes rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
kehuum committed Nov 3, 2023
1 parent 142cae1 commit 939ab41
Show file tree
Hide file tree
Showing 18 changed files with 435 additions and 2 deletions.
19 changes: 19 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -309,6 +310,24 @@ default ListTopicsResult listTopics() {
*/
ListTopicsResult listTopics(ListTopicsOptions options);

/**
* List all federated topic znodes
* @return all federated topic znodes, formatted /namespace/topic
*/
default ListFederatedTopicZnodesResult listFederatedTopicZnodes() {
return listFederatedTopicZnodes(Collections.emptyList(), new ListFederatedTopicZnodesOptions());
}

/**
* List federated topic znodes match given topic names
* @param federatedTopics topic names
* @param options the options to use when list federated topic znodes
* @return empty list if the given topic names' znode don't exist; otherwise return the federated topics formatted
* /namespace/topic
*/
ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesOptions options);

/**
* Describe some topics in the cluster, with the default options.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData;
import org.apache.kafka.common.message.LiCreateFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiDeleteFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiMoveControllerRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
Expand Down Expand Up @@ -221,6 +222,8 @@
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckRequest;
import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckResponse;
import org.apache.kafka.common.requests.LiListFederatedTopicZnodesRequest;
import org.apache.kafka.common.requests.LiListFederatedTopicZnodesResponse;
import org.apache.kafka.common.requests.LiMoveControllerRequest;
import org.apache.kafka.common.requests.LiMoveControllerResponse;
import org.apache.kafka.common.requests.LiCreateFederatedTopicZnodesRequest;
Expand Down Expand Up @@ -2029,6 +2032,45 @@ void handleFailure(Throwable throwable) {
return new ListTopicsResult(topicListingFuture);
}

@Override
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
final KafkaFutureImpl<List<String>> federatedTopicZnodesListingFuture = new KafkaFutureImpl<>();
List<LiListFederatedTopicZnodesRequestData.FederatedTopics> topicsRequested = new ArrayList<>();
federatedTopics.forEach(topic ->
topicsRequested.add(new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic)));
final long now = time.milliseconds();
runnable.call(new Call("listFederatedTopicZnodes", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {

@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new LiListFederatedTopicZnodesRequest.Builder(
new LiListFederatedTopicZnodesRequestData().setTopics(topicsRequested), (short) 0
);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
LiListFederatedTopicZnodesResponse response = (LiListFederatedTopicZnodesResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
federatedTopicZnodesListingFuture.completeExceptionally(error.exception());
return;
}

List<String> allFederatedTopicZnodes = new ArrayList<>(response.data().topics());
federatedTopicZnodesListingFuture.complete(allFederatedTopicZnodes);
}

@Override
void handleFailure(Throwable throwable) {
federatedTopicZnodesListingFuture.completeExceptionally(throwable);
}
}, now);
return new ListFederatedTopicZnodesResult(federatedTopicZnodesListingFuture);
}

@Override
public DescribeTopicsResult describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;


/**
* Options for {@link Admin#listFederatedTopicZnodes()}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListFederatedTopicZnodesOptions extends AbstractOptions<ListFederatedTopicZnodesOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import java.util.List;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;


/**
* The result of the {@link Admin#listFederatedTopicZnodes()} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListFederatedTopicZnodesResult {
final KafkaFuture<List<String>> future;

ListFederatedTopicZnodesResult(KafkaFuture<List<String>> future) {
this.future = future;
}

/**
* Return a future which yields a list of federated topic names
*/
public KafkaFuture<List<String>> topics() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public ListTopicsResult listTopics(ListTopicsOptions options) {
return null;
}

@Override
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
return null;
}

@Override
public DescribeTopicsResult describeTopics(Collection<String> topicNames, DescribeTopicsOptions options) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public enum ApiKeys {
LI_MOVE_CONTROLLER(ApiMessageType.LI_MOVE_CONTROLLER, true),

LI_CREATE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_CREATE_FEDERATED_TOPIC_ZNODES, false, true),
LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true);
LI_DELETE_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_DELETE_FEDERATED_TOPIC_ZNODES, false, true),
LI_LIST_FEDERATED_TOPIC_ZNODES(ApiMessageType.LI_LIST_FEDERATED_TOPIC_ZNODES);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return LiCreateFederatedTopicZnodesRequest.parse(buffer, apiVersion);
case LI_DELETE_FEDERATED_TOPIC_ZNODES:
return LiDeleteFederatedTopicZnodesRequest.parse(buffer, apiVersion);
case LI_LIST_FEDERATED_TOPIC_ZNODES:
return LiListFederatedTopicZnodesRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return LiCreateFederatedTopicZnodesResponse.parse(responseBuffer, version);
case LI_DELETE_FEDERATED_TOPIC_ZNODES:
return LiDeleteFederatedTopicZnodesResponse.parse(responseBuffer, version);
case LI_LIST_FEDERATED_TOPIC_ZNODES:
return LiListFederatedTopicZnodesResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.kafka.common.message.LiListFederatedTopicZnodesRequestData;
import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiListFederatedTopicZnodesRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<LiListFederatedTopicZnodesRequest> {

private final LiListFederatedTopicZnodesRequestData data;

public Builder(LiListFederatedTopicZnodesRequestData data, short allowedVersion) {
super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, allowedVersion);
this.data = data;
}

@Override
public LiListFederatedTopicZnodesRequest build(short version) {
return new LiListFederatedTopicZnodesRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final LiListFederatedTopicZnodesRequestData data;

public LiListFederatedTopicZnodesRequest(LiListFederatedTopicZnodesRequestData data, short version) {
super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES, version);
this.data = data;
}

@Override
public LiListFederatedTopicZnodesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData().
setTopics(Collections.emptyList()).
setErrorCode(Errors.forException(e).code());
return new LiListFederatedTopicZnodesResponse(data, version());
}

public static LiListFederatedTopicZnodesRequest parse(ByteBuffer buffer, short version) {
return new LiListFederatedTopicZnodesRequest(
new LiListFederatedTopicZnodesRequestData(new ByteBufferAccessor(buffer), version), version
);
}

@Override
public LiListFederatedTopicZnodesRequestData data() {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.common.message.LiListFederatedTopicZnodesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiListFederatedTopicZnodesResponse extends AbstractResponse {
private final LiListFederatedTopicZnodesResponseData data;
private final short version;

public LiListFederatedTopicZnodesResponse(LiListFederatedTopicZnodesResponseData data, short version) {
super(ApiKeys.LI_LIST_FEDERATED_TOPIC_ZNODES);
this.data = data;
this.version = version;
}

@Override
public LiListFederatedTopicZnodesResponseData data() {
return data;
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public Map<Errors, Integer> errorCounts() {
return errorCounts(Errors.forCode(data.errorCode()));
}

public static LiListFederatedTopicZnodesResponse parse(ByteBuffer buffer, short version) {
return new LiListFederatedTopicZnodesResponse(
new LiListFederatedTopicZnodesResponseData(new ByteBufferAccessor(buffer), version), version
);
}

public static LiListFederatedTopicZnodesResponse prepareResponse(Errors error, int throttleTimeMs, short version) {
LiListFederatedTopicZnodesResponseData data = new LiListFederatedTopicZnodesResponseData();
data.setErrorCode(error.code());
data.setThrottleTimeMs(throttleTimeMs);
return new LiListFederatedTopicZnodesResponse(data, version);
}

public short version() {
return version;
}

@Override
public String toString() {
return data.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 1005,
"type": "request",
"listeners": ["zkBroker"],
"name": "LiListFederatedTopicZnodesRequest",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The simple name of the federated topics",
"fields": [
{"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"}
]}
]
}
Loading

0 comments on commit 939ab41

Please sign in to comment.