Skip to content

Commit

Permalink
more naming
Browse files Browse the repository at this point in the history
  • Loading branch information
kehuum committed Oct 27, 2023
1 parent 4ee7307 commit 3dcde59
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LiControlledShutdownSkipSafetyCheckRequestData;
import org.apache.kafka.common.message.LiFederatedTopicCreateRequestData;
import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateRequestData;
import org.apache.kafka.common.message.LiMoveControllerRequestData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
Expand Down Expand Up @@ -222,8 +222,8 @@
import org.apache.kafka.common.requests.LiControlledShutdownSkipSafetyCheckResponse;
import org.apache.kafka.common.requests.LiMoveControllerRequest;
import org.apache.kafka.common.requests.LiMoveControllerResponse;
import org.apache.kafka.common.requests.LiFederatedTopicCreateRequest;
import org.apache.kafka.common.requests.LiFederatedTopicCreateResponse;
import org.apache.kafka.common.requests.LiFederatedTopicZnodeCreateRequest;
import org.apache.kafka.common.requests.LiFederatedTopicZnodeCreateResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
Expand Down Expand Up @@ -1604,23 +1604,23 @@ public CreateOrDeleteFederatedTopicsZnodeResult createFederatedTopicsZnode(final
final CreateFederatedTopicsZnodeOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(federatedTopics.size());
final long now = time.milliseconds();
List<LiFederatedTopicCreateRequestData.FederatedTopics> topics = new ArrayList<>();
List<LiFederatedTopicZnodeCreateRequestData.FederatedTopics> topics = new ArrayList<>();
federatedTopics.forEach((topic, namespace) -> {
topics.add(new LiFederatedTopicCreateRequestData.FederatedTopics().setName(topic).setNamespace(namespace));
topics.add(new LiFederatedTopicZnodeCreateRequestData.FederatedTopics().setName(topic).setNamespace(namespace));
topicFutures.put(topic, new KafkaFutureImpl<>());
});
runnable.call(new Call("createFederatedTopicsZnode", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
AbstractRequest.Builder<?> createRequest(int timeoutMs) {
return new LiFederatedTopicCreateRequest.Builder(new LiFederatedTopicCreateRequestData()
return new LiFederatedTopicZnodeCreateRequest.Builder(new LiFederatedTopicZnodeCreateRequestData()
.setTopics(topics)
.setTimeoutMs(timeoutMs), (short) 0);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
LiFederatedTopicCreateResponse response = (LiFederatedTopicCreateResponse) abstractResponse;
LiFederatedTopicZnodeCreateResponse response = (LiFederatedTopicZnodeCreateResponse) abstractResponse;
Errors errors = Errors.forCode(response.data().errorCode());
if (errors != Errors.NONE) {
completeAllExceptionally(topicFutures.values(), errors.exception());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
case LI_MOVE_CONTROLLER:
return LiMoveControllerRequest.parse(buffer, apiVersion);
case LI_CREATE_FEDERATED_TOPIC_ZNODES:
return LiFederatedTopicCreateRequest.parse(buffer, apiVersion);
return LiFederatedTopicZnodeCreateRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
case LI_MOVE_CONTROLLER:
return LiMoveControllerResponse.parse(responseBuffer, version);
case LI_CREATE_FEDERATED_TOPIC_ZNODES:
return LiFederatedTopicCreateResponse.parse(responseBuffer, version);
return LiFederatedTopicZnodeCreateResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@
package org.apache.kafka.common.requests;

import java.nio.ByteBuffer;
import org.apache.kafka.common.message.LiFederatedTopicCreateRequestData;
import org.apache.kafka.common.message.LiFederatedTopicCreateResponseData;
import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateRequestData;
import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiFederatedTopicCreateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<LiFederatedTopicCreateRequest> {
private final LiFederatedTopicCreateRequestData data;
public class LiFederatedTopicZnodeCreateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<LiFederatedTopicZnodeCreateRequest> {
private final LiFederatedTopicZnodeCreateRequestData data;

public Builder(LiFederatedTopicCreateRequestData data, short allowedVersion) {
public Builder(LiFederatedTopicZnodeCreateRequestData data, short allowedVersion) {
super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES, allowedVersion);
this.data = data;
}

@Override
public LiFederatedTopicCreateRequest build(short version) {
return new LiFederatedTopicCreateRequest(data, version);
public LiFederatedTopicZnodeCreateRequest build(short version) {
return new LiFederatedTopicZnodeCreateRequest(data, version);
}

@Override
Expand All @@ -45,26 +45,26 @@ public String toString() {
}
}

private final LiFederatedTopicCreateRequestData data;
private final LiFederatedTopicZnodeCreateRequestData data;

LiFederatedTopicCreateRequest(LiFederatedTopicCreateRequestData data, short version) {
LiFederatedTopicZnodeCreateRequest(LiFederatedTopicZnodeCreateRequestData data, short version) {
super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES, version);
this.data = data;
}

public static LiFederatedTopicCreateRequest parse(ByteBuffer buffer, short version) {
return new LiFederatedTopicCreateRequest(new LiFederatedTopicCreateRequestData(new ByteBufferAccessor(buffer), version), version);
public static LiFederatedTopicZnodeCreateRequest parse(ByteBuffer buffer, short version) {
return new LiFederatedTopicZnodeCreateRequest(new LiFederatedTopicZnodeCreateRequestData(new ByteBufferAccessor(buffer), version), version);
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
LiFederatedTopicCreateResponseData data = new LiFederatedTopicCreateResponseData()
LiFederatedTopicZnodeCreateResponseData data = new LiFederatedTopicZnodeCreateResponseData()
.setErrorCode(Errors.forException(e).code());
return new LiFederatedTopicCreateResponse(data, version());
return new LiFederatedTopicZnodeCreateResponse(data, version());
}

@Override
public LiFederatedTopicCreateRequestData data() {
public LiFederatedTopicZnodeCreateRequestData data() {
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.message.LiFederatedTopicCreateResponseData;
import org.apache.kafka.common.message.LiFederatedTopicZnodeCreateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;


public class LiFederatedTopicCreateResponse extends AbstractResponse {
private final LiFederatedTopicCreateResponseData data;
public class LiFederatedTopicZnodeCreateResponse extends AbstractResponse {
private final LiFederatedTopicZnodeCreateResponseData data;
private final short version;

public LiFederatedTopicCreateResponse(LiFederatedTopicCreateResponseData data, short version) {
public LiFederatedTopicZnodeCreateResponse(LiFederatedTopicZnodeCreateResponseData data, short version) {
super(ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES);
this.data = data;
this.version = version;
Expand All @@ -46,19 +46,19 @@ public Map<Errors, Integer> errorCounts() {
}

@Override
public LiFederatedTopicCreateResponseData data() {
public LiFederatedTopicZnodeCreateResponseData data() {
return data;
}

public static LiFederatedTopicCreateResponse prepareResponse(Errors error, int throttleTimeMs, short version) {
LiFederatedTopicCreateResponseData data = new LiFederatedTopicCreateResponseData();
public static LiFederatedTopicZnodeCreateResponse prepareResponse(Errors error, int throttleTimeMs, short version) {
LiFederatedTopicZnodeCreateResponseData data = new LiFederatedTopicZnodeCreateResponseData();
data.setErrorCode(error.code());
data.setThrottleTimeMs(throttleTimeMs);
return new LiFederatedTopicCreateResponse(data, version);
return new LiFederatedTopicZnodeCreateResponse(data, version);
}

public static LiFederatedTopicCreateResponse parse(ByteBuffer buffer, short version) {
return new LiFederatedTopicCreateResponse(new LiFederatedTopicCreateResponseData(new ByteBufferAccessor(buffer), version), version);
public static LiFederatedTopicZnodeCreateResponse parse(ByteBuffer buffer, short version) {
return new LiFederatedTopicZnodeCreateResponse(new LiFederatedTopicZnodeCreateResponseData(new ByteBufferAccessor(buffer), version), version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"apiKey": 1003,
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "LiFederatedTopicCreateRequest",
"name": "LiFederatedTopicZnodeCreateRequest",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
{
"apiKey": 1003,
"type": "response",
"name": "LiFederatedTopicCreateResponse",
"name": "LiFederatedTopicZnodeCreateResponse",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/network/RequestConvertToJson.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object RequestConvertToJson {
case req: LiControlledShutdownSkipSafetyCheckRequest => LiControlledShutdownSkipSafetyCheckRequestDataJsonConverter.write(req.data, request.version)
case req: LiCombinedControlRequest => LiCombinedControlRequestDataJsonConverter.write(req.data, request.version())
case req: LiMoveControllerRequest => LiMoveControllerRequestDataJsonConverter.write(req.data, request.version())
case req: LiFederatedTopicCreateRequest => LiFederatedTopicCreateRequestDataJsonConverter.write(req.data, request.version())
case req: LiFederatedTopicZnodeCreateRequest => LiFederatedTopicCreateRequestDataJsonConverter.write(req.data, request.version())
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so.");
}
Expand Down Expand Up @@ -177,7 +177,7 @@ object RequestConvertToJson {
case res: LiControlledShutdownSkipSafetyCheckResponse => LiControlledShutdownSkipSafetyCheckResponseDataJsonConverter.write(res.data, version)
case res: LiCombinedControlResponse => LiCombinedControlResponseDataJsonConverter.write(res.data, version)
case res: LiMoveControllerResponse => LiMoveControllerResponseDataJsonConverter.write(res.data, version)
case res: LiFederatedTopicCreateResponse => LiFederatedTopicCreateResponseDataJsonConverter.write(res.data, version)
case res: LiFederatedTopicZnodeCreateResponse => LiFederatedTopicCreateResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3688,12 +3688,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}

def handleMarkFederatedTopicRequest(request: RequestChannel.Request): Unit = {
val markFederatedTopicRequest = request.body[LiFederatedTopicCreateRequest]
val markFederatedTopicRequest = request.body[LiFederatedTopicZnodeCreateRequest]
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))

if (!zkSupport.controller.isActive) {
requestHelper.sendResponseExemptThrottle(request,
LiFederatedTopicCreateResponse.prepareResponse(Errors.NOT_CONTROLLER, 0, markFederatedTopicRequest.version())
LiFederatedTopicZnodeCreateResponse.prepareResponse(Errors.NOT_CONTROLLER, 0, markFederatedTopicRequest.version())
)
} else {
val hasClusterAuthorization = authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
Expand Down Expand Up @@ -3736,7 +3736,7 @@ class KafkaApis(val requestChannel: RequestChannel,
zkSupport.zkClient.createFederatedTopicZNode(entry._1, entry._2)
})
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
LiFederatedTopicCreateResponse.prepareResponse(Errors.NONE, requestThrottleMs, markFederatedTopicRequest.version())
LiFederatedTopicZnodeCreateResponse.prepareResponse(Errors.NONE, requestThrottleMs, markFederatedTopicRequest.version())
)
} catch {
case throwable: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ class RequestQuotaTest extends BaseRequestTest {
new LiMoveControllerRequest.Builder(new LiMoveControllerRequestData(), ApiKeys.LI_MOVE_CONTROLLER.latestVersion)

case ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES =>
new LiFederatedTopicCreateRequest.Builder(new LiFederatedTopicCreateRequestData(), ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES.latestVersion)
new LiFederatedTopicZnodeCreateRequest.Builder(new LiFederatedTopicCreateRequestData(), ApiKeys.LI_CREATE_FEDERATED_TOPIC_ZNODES.latestVersion)

case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
Expand Down

0 comments on commit 3dcde59

Please sign in to comment.