From 3dcde599c29cdcb9720a6483564e95a6f16d8c6e Mon Sep 17 00:00:00 2001 From: kehu Date: Thu, 26 Oct 2023 17:15:27 -0700 Subject: [PATCH] more naming --- .../kafka/clients/admin/KafkaAdminClient.java | 14 ++++----- .../common/requests/AbstractRequest.java | 2 +- .../common/requests/AbstractResponse.java | 2 +- ...> LiFederatedTopicZnodeCreateRequest.java} | 30 +++++++++---------- ... LiFederatedTopicZnodeCreateResponse.java} | 20 ++++++------- ...> LiFederatedTopicZnodeCreateRequest.json} | 2 +- ... LiFederatedTopicZnodeCreateResponse.json} | 2 +- .../kafka/network/RequestConvertToJson.scala | 4 +-- .../main/scala/kafka/server/KafkaApis.scala | 6 ++-- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- 10 files changed, 42 insertions(+), 42 deletions(-) rename clients/src/main/java/org/apache/kafka/common/requests/{LiFederatedTopicCreateRequest.java => LiFederatedTopicZnodeCreateRequest.java} (57%) rename clients/src/main/java/org/apache/kafka/common/requests/{LiFederatedTopicCreateResponse.java => LiFederatedTopicZnodeCreateResponse.java} (66%) rename clients/src/main/resources/common/message/{LiFederatedTopicCreateRequest.json => LiFederatedTopicZnodeCreateRequest.json} (96%) rename clients/src/main/resources/common/message/{LiFederatedTopicCreateResponse.json => LiFederatedTopicZnodeCreateResponse.json} (96%) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 693ef47aa6db8..fb281acfd2ea4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -141,7 +141,7 @@ import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData; -import org.apache.kafka.common.message.LiFederatedTopicCreateRequestData; +import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateRequestData; import org.apache.kafka.common.message.LiMoveControllerRequestData; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -222,8 +222,8 @@ import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckResponse; import org.apache.kafka.common.requests.LiMoveControllerRequest; import org.apache.kafka.common.requests.LiMoveControllerResponse; -import org.apache.kafka.common.requests.LiFederatedTopicCreateRequest; -import org.apache.kafka.common.requests.LiFederatedTopicCreateResponse; +import org.apache.kafka.common.requests.LiFederatedTopicZnodeCreateRequest; +import org.apache.kafka.common.requests.LiFederatedTopicZnodeCreateResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetsRequest; @@ -1604,23 +1604,23 @@ public CreateOrDeleteFederatedTopicsZnodeResult createFederatedTopicsZnode(final final CreateFederatedTopicsZnodeOptions options) { final Map> topicFutures = new HashMap<>(federatedTopics.size()); final long now = time.milliseconds(); - List topics = new ArrayList<>(); + List topics = new ArrayList<>(); federatedTopics.forEach((topic, namespace) -> { - topics.add(new LiFederatedTopicCreateRequestData.FederatedTopics().setName(topic).setNamespace(namespace)); + topics.add(new LiFederatedTopicZnodeCreateRequestData.FederatedTopics().setName(topic).setNamespace(namespace)); topicFutures.put(topic, new KafkaFutureImpl<>()); }); runnable.call(new Call("createFederatedTopicsZnode", calcDeadlineMs(now, options.timeoutMs()), new ControllerNodeProvider()) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new LiFederatedTopicCreateRequest.Builder(new LiFederatedTopicCreateRequestData() + return new LiFederatedTopicZnodeCreateRequest.Builder(new LiFederatedTopicZnodeCreateRequestData() .setTopics(topics) .setTimeoutMs(timeoutMs), (short) 0); } @Override void handleResponse(AbstractResponse abstractResponse) { - LiFederatedTopicCreateResponse response = (LiFederatedTopicCreateResponse) abstractResponse; + LiFederatedTopicZnodeCreateResponse response = (LiFederatedTopicZnodeCreateResponse) abstractResponse; Errors errors = Errors.forCode(response.data().errorCode()); if (errors != Errors.NONE) { completeAllExceptionally(topicFutures.values(), errors.exception()); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 823a935d1bcdb..0f1d9bca6eefe 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -310,7 +310,7 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, case LI_MOVE_CONTROLLER: return LiMoveControllerRequest.parse(buffer, apiVersion); case LI_CREATE_FEDERATED_TOPIC_ZNODES: - return LiFederatedTopicCreateRequest.parse(buffer, apiVersion); + return LiFederatedTopicZnodeCreateRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 80c852c0da4c9..46c65e03263ce 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -254,7 +254,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response case LI_MOVE_CONTROLLER: return LiMoveControllerResponse.parse(responseBuffer, version); case LI_CREATE_FEDERATED_TOPIC_ZNODES: - return LiFederatedTopicCreateResponse.parse(responseBuffer, version); + return LiFederatedTopicZnodeCreateResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateRequest.java similarity index 57% rename from clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateRequest.java index a4d82c61d65bc..63f208649c2ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateRequest.java @@ -18,25 +18,25 @@ package org.apache.kafka.common.requests; import java.nio.ByteBuffer; -import org.apache.kafka.common.message.LiFederatedTopicCreateRequestData; -import org.apache.kafka.common.message.LiFederatedTopicCreateResponseData; +import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateRequestData; +import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -public class LiFederatedTopicCreateRequest extends AbstractRequest { - public static class Builder extends AbstractRequest.Builder { - private final LiFederatedTopicCreateRequestData data; +public class LiFederatedTopicZnodeCreateRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder { + private final LiFederatedTopicZnodeCreateRequestData data; - public Builder(LiFederatedTopicCreateRequestData data, short allowedVersion) { + public Builder(LiFederatedTopicZnodeCreateRequestData data, short allowedVersion) { super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES, allowedVersion); this.data = data; } @Override - public LiFederatedTopicCreateRequest build(short version) { - return new LiFederatedTopicCreateRequest(data, version); + public LiFederatedTopicZnodeCreateRequest build(short version) { + return new LiFederatedTopicZnodeCreateRequest(data, version); } @Override @@ -45,26 +45,26 @@ public String toString() { } } - private final LiFederatedTopicCreateRequestData data; + private final LiFederatedTopicZnodeCreateRequestData data; - LiFederatedTopicCreateRequest(LiFederatedTopicCreateRequestData data, short version) { + LiFederatedTopicZnodeCreateRequest(LiFederatedTopicZnodeCreateRequestData data, short version) { super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES, version); this.data = data; } - public static LiFederatedTopicCreateRequest parse(ByteBuffer buffer, short version) { - return new LiFederatedTopicCreateRequest(new LiFederatedTopicCreateRequestData(new ByteBufferAccessor(buffer), version), version); + public static LiFederatedTopicZnodeCreateRequest parse(ByteBuffer buffer, short version) { + return new LiFederatedTopicZnodeCreateRequest(new LiFederatedTopicZnodeCreateRequestData(new ByteBufferAccessor(buffer), version), version); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - LiFederatedTopicCreateResponseData data = new LiFederatedTopicCreateResponseData() + LiFederatedTopicZnodeCreateResponseData data = new LiFederatedTopicZnodeCreateResponseData() .setErrorCode(Errors.forException(e).code()); - return new LiFederatedTopicCreateResponse(data, version()); + return new LiFederatedTopicZnodeCreateResponse(data, version()); } @Override - public LiFederatedTopicCreateRequestData data() { + public LiFederatedTopicZnodeCreateRequestData data() { return data; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateResponse.java similarity index 66% rename from clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateResponse.java index bcbf797054287..6ea2e0b1c01e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicCreateResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LiFederatedTopicZnodeCreateResponse.java @@ -20,17 +20,17 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; -import org.apache.kafka.common.message.LiFederatedTopicCreateResponseData; +import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -public class LiFederatedTopicCreateResponse extends AbstractResponse { - private final LiFederatedTopicCreateResponseData data; +public class LiFederatedTopicZnodeCreateResponse extends AbstractResponse { + private final LiFederatedTopicZnodeCreateResponseData data; private final short version; - public LiFederatedTopicCreateResponse(LiFederatedTopicCreateResponseData data, short version) { + public LiFederatedTopicZnodeCreateResponse(LiFederatedTopicZnodeCreateResponseData data, short version) { super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES); this.data = data; this.version = version; @@ -46,19 +46,19 @@ public Map errorCounts() { } @Override - public LiFederatedTopicCreateResponseData data() { + public LiFederatedTopicZnodeCreateResponseData data() { return data; } - public static LiFederatedTopicCreateResponse prepareResponse(Errors error, int throttleTimeMs, short version) { - LiFederatedTopicCreateResponseData data = new LiFederatedTopicCreateResponseData(); + public static LiFederatedTopicZnodeCreateResponse prepareResponse(Errors error, int throttleTimeMs, short version) { + LiFederatedTopicZnodeCreateResponseData data = new LiFederatedTopicZnodeCreateResponseData(); data.setErrorCode(error.code()); data.setThrottleTimeMs(throttleTimeMs); - return new LiFederatedTopicCreateResponse(data, version); + return new LiFederatedTopicZnodeCreateResponse(data, version); } - public static LiFederatedTopicCreateResponse parse(ByteBuffer buffer, short version) { - return new LiFederatedTopicCreateResponse(new LiFederatedTopicCreateResponseData(new ByteBufferAccessor(buffer), version), version); + public static LiFederatedTopicZnodeCreateResponse parse(ByteBuffer buffer, short version) { + return new LiFederatedTopicZnodeCreateResponse(new LiFederatedTopicZnodeCreateResponseData(new ByteBufferAccessor(buffer), version), version); } @Override diff --git a/clients/src/main/resources/common/message/LiFederatedTopicCreateRequest.json b/clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateRequest.json similarity index 96% rename from clients/src/main/resources/common/message/LiFederatedTopicCreateRequest.json rename to clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateRequest.json index 6d21e61f17098..1581ff84dcd28 100644 --- a/clients/src/main/resources/common/message/LiFederatedTopicCreateRequest.json +++ b/clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateRequest.json @@ -17,7 +17,7 @@ "apiKey": 1003, "type": "request", "listeners": ["zkBroker", "broker", "controller"], - "name": "LiFederatedTopicCreateRequest", + "name": "LiFederatedTopicZnodeCreateRequest", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ diff --git a/clients/src/main/resources/common/message/LiFederatedTopicCreateResponse.json b/clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateResponse.json similarity index 96% rename from clients/src/main/resources/common/message/LiFederatedTopicCreateResponse.json rename to clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateResponse.json index 7fbf4ae901f69..5da4b48dfe9fd 100644 --- a/clients/src/main/resources/common/message/LiFederatedTopicCreateResponse.json +++ b/clients/src/main/resources/common/message/LiFederatedTopicZnodeCreateResponse.json @@ -15,7 +15,7 @@ { "apiKey": 1003, "type": "response", - "name": "LiFederatedTopicCreateResponse", + "name": "LiFederatedTopicZnodeCreateResponse", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index 6b76c11a55fd5..04bf3d0b84107 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -98,7 +98,7 @@ object RequestConvertToJson { case req: LiControlledShutdownSkipSafetyCheckRequest => LiControlledShutdownSkipSafetyCheckRequestDataJsonConverter.write(req.data, request.version) case req: LiCombinedControlRequest => LiCombinedControlRequestDataJsonConverter.write(req.data, request.version()) case req: LiMoveControllerRequest => LiMoveControllerRequestDataJsonConverter.write(req.data, request.version()) - case req: LiFederatedTopicCreateRequest => LiFederatedTopicCreateRequestDataJsonConverter.write(req.data, request.version()) + case req: LiFederatedTopicZnodeCreateRequest => LiFederatedTopicCreateRequestDataJsonConverter.write(req.data, request.version()) case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " + "code should be updated to do so."); } @@ -177,7 +177,7 @@ object RequestConvertToJson { case res: LiControlledShutdownSkipSafetyCheckResponse => LiControlledShutdownSkipSafetyCheckResponseDataJsonConverter.write(res.data, version) case res: LiCombinedControlResponse => LiCombinedControlResponseDataJsonConverter.write(res.data, version) case res: LiMoveControllerResponse => LiMoveControllerResponseDataJsonConverter.write(res.data, version) - case res: LiFederatedTopicCreateResponse => LiFederatedTopicCreateResponseDataJsonConverter.write(res.data, version) + case res: LiFederatedTopicZnodeCreateResponse => LiFederatedTopicCreateResponseDataJsonConverter.write(res.data, version) case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " + "code should be updated to do so."); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1ca0ed3c8cdd7..1adcee4b76780 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3688,12 +3688,12 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleMarkFederatedTopicRequest(request: RequestChannel.Request): Unit = { - val markFederatedTopicRequest = request.body[LiFederatedTopicCreateRequest] + val markFederatedTopicRequest = request.body[LiFederatedTopicZnodeCreateRequest] val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) if (!zkSupport.controller.isActive) { requestHelper.sendResponseExemptThrottle(request, - LiFederatedTopicCreateResponse.prepareResponse(Errors.NOT_CONTROLLER, 0, markFederatedTopicRequest.version()) + LiFederatedTopicZnodeCreateResponse.prepareResponse(Errors.NOT_CONTROLLER, 0, markFederatedTopicRequest.version()) ) } else { val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, @@ -3736,7 +3736,7 @@ class KafkaApis(val requestChannel: RequestChannel, zkSupport.zkClient.createFederatedTopicZNode(entry._1, entry._2) }) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - LiFederatedTopicCreateResponse.prepareResponse(Errors.NONE, requestThrottleMs, markFederatedTopicRequest.version()) + LiFederatedTopicZnodeCreateResponse.prepareResponse(Errors.NONE, requestThrottleMs, markFederatedTopicRequest.version()) ) } catch { case throwable: Throwable => diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 0d2072ae0e5cc..48987923a6dc9 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -662,7 +662,7 @@ class RequestQuotaTest extends BaseRequestTest { new LiMoveControllerRequest.Builder(new LiMoveControllerRequestData(), ApiKeys.LI_MOVE_CONTROLLER.latestVersion) case ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES => - new LiFederatedTopicCreateRequest.Builder(new LiFederatedTopicCreateRequestData(), ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES.latestVersion) + new LiFederatedTopicZnodeCreateRequest.Builder(new LiFederatedTopicCreateRequestData(), ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES.latestVersion) case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey)