Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIKAFKA-41423: Kafka federation proof of concept using dual-cluster integration test (ProxyBasedFederationTest) #326

Open
wants to merge 9 commits into
base: 2.4-li
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public synchronized void update(int requestVersion, MetadataResponse response, l

String newClusterId = cache.cluster().clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
log.info("Cluster ID = {}", newClusterId);
}
clusterResourceListeners.onUpdate(cache.cluster().clusterResource());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,7 @@ private void handleCompletedReceives(List<ClientResponse> responses, long now) {
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
// FIXME: probable perf concern: cache isTraceEnabled() outside loop, used cached boolean here
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;

import static java.util.Collections.singletonList;
Expand All @@ -50,16 +51,19 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
public static class Builder extends AbstractControlRequest.Builder<UpdateMetadataRequest> {
private final List<UpdateMetadataPartitionState> partitionStates;
private final List<UpdateMetadataBroker> liveBrokers;
private final String originClusterId;
private Lock buildLock = new ReentrantLock();

// LIKAFKA-18349 - Cache the UpdateMetadataRequest Objects to reduce memory usage
private final Map<Short, UpdateMetadataRequest> requestCache = new HashMap<>();

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, long maxBrokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers) {
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
String originClusterId) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, maxBrokerEpoch);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.originClusterId = originClusterId;
}

@Override
Expand Down Expand Up @@ -104,6 +108,12 @@ public UpdateMetadataRequest build(short version) {
data.setUngroupedPartitionStates(partitionStates);
}

// originClusterId == null implies federation is not enabled (though reverse may not be true); will be
// ignored during serialization (data.toStruct())
if (version >= 7) {
data.setOriginClusterId(originClusterId);
}

updateMetadataRequest = new UpdateMetadataRequest(data, version);
requestCache.put(version, updateMetadataRequest);
}
Expand Down Expand Up @@ -158,6 +168,32 @@ public List<UpdateMetadataBroker> liveBrokers() {
}
}

/**
* Dummy "builder" that simply wraps an already-built UpdateMetadataRequest. This is needed in order to
* support submission of rewritten remote requests (i.e., from controllers in other physical clusters in
* a federated setup) to the broker-queues in this controller's cluster.
*/
public static class WrappingBuilder extends Builder {
private final UpdateMetadataRequest updateMetadataRequest;

public WrappingBuilder(UpdateMetadataRequest umr) {
super(umr.version(), umr.controllerId(), umr.controllerEpoch(), umr.brokerEpoch(), umr.maxBrokerEpoch(),
toList(umr.partitionStates()), umr.liveBrokers(), umr.originClusterId());
this.updateMetadataRequest = umr;
}

@Override
public UpdateMetadataRequest build(short version) {
return updateMetadataRequest;
}

private static <T> List<T> toList(Iterable<T> iterable) {
List<T> list = new LinkedList<>();
iterable.forEach(list::add);
return list;
}
}

private final UpdateMetadataRequestData data;
// LIKAFKA-18349 - Cache the UpdateMetadataRequest struct to reduce memory usage
private Struct struct = null;
Expand Down Expand Up @@ -214,6 +250,16 @@ public UpdateMetadataRequest(Struct struct, short version) {
this(new UpdateMetadataRequestData(struct, version), version);
}

// federation
public String originClusterId() {
return data.originClusterId();
}

// federation
public String routingClusterId() {
return data.routingClusterId();
}

@Override
public int controllerId() {
return data.controllerId();
Expand Down Expand Up @@ -283,6 +329,26 @@ protected Struct toStruct() {
}
}

// federation
public void rewriteRemoteRequest(String routingClusterId, int controllerId, int controllerEpoch, long maxBrokerEpoch) {
// FIXME? should we add a version check for 7+? federation should not be enabled with less than that...
structLock.lock();
try {
data.setRoutingClusterId(routingClusterId);
data.setControllerId(controllerId);
data.setControllerEpoch(controllerEpoch);
// brokerEpoch apparently gets rewritten by the controller for every receiving broker (somewhere...)
// before sending it to them: shouldn't need to mess with it here, right? or should we remove it in
// the version >= 6 case? FIXME?
if (version() >= 6) {
data.setMaxBrokerEpoch(maxBrokerEpoch);
}
struct = null; // invalidate cache (in case it's there)
} finally {
structLock.unlock();
}
}

// Visible for testing
UpdateMetadataRequestData data() {
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
"validVersions": "0-7",
"flexibleVersions": "7+",
"fields": [
{ "name": "OriginClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null",
"taggedVersions": "7+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. In federated clusters, this is the ID of the originating physical cluster, i.e., it matches the included broker info." },
{ "name": "RoutingClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null",
"taggedVersions": "7+", "tag": 1, "ignorable": true,
"about": "The 'effective' or rewritten clusterId; for routing purposes, has precedence over OriginClusterId if present. In federated clusters, updates from other physical clusters must be modified by the local controller and then forwarded to local brokers, including to the broker half of the controller itself. This field allows the local controller to avoid infinite loops." },
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The controller id." },
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,7 +1355,7 @@ private UpdateMetadataRequest createUpdateMetadataRequest(int version, String ra
.setRack(rack)
);
return new UpdateMetadataRequest.Builder((short) version, 1, 10, 0, 0, partitionStates,
liveBrokers).build();
liveBrokers, "dummyClusterId").build();
}

private UpdateMetadataResponse createUpdateMetadataResponse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class UpdateMetadataRequestTest {
public void testUnsupportedVersion() {
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder(
(short) (UPDATE_METADATA.latestVersion() + 1), 0, 0, 0, 0,
Collections.emptyList(), Collections.emptyList());
Collections.emptyList(), Collections.emptyList(), "dummyClusterId");
assertThrows(UnsupportedVersionException.class, builder::build);
}

@Test
public void testGetErrorResponse() {
for (short version = UPDATE_METADATA.oldestVersion(); version < UPDATE_METADATA.latestVersion(); version++) {
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder(
version, 0, 0, 0, 0, Collections.emptyList(), Collections.emptyList());
version, 0, 0, 0, 0, Collections.emptyList(), Collections.emptyList(), "dummyClusterId");
UpdateMetadataRequest request = builder.build();
UpdateMetadataResponse response = request.getErrorResponse(0,
new ClusterAuthorizationException("Not authorized"));
Expand Down Expand Up @@ -149,7 +149,7 @@ public void testVersionLogic() {
);

UpdateMetadataRequest request = new UpdateMetadataRequest.Builder(version, 1, 2, 3, 3,
partitionStates, liveBrokers).build();
partitionStates, liveBrokers, "dummyClusterId").build();

assertEquals(new HashSet<>(partitionStates), iterableToSet(request.partitionStates()));
assertEquals(liveBrokers, request.liveBrokers());
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testTopicPartitionGroupingSizeReduction() {
.setPartitionIndex(tp.partition()));
}
UpdateMetadataRequest.Builder builder = new UpdateMetadataRequest.Builder((short) 5, 0, 0, 0, 0,
partitionStates, Collections.emptyList());
partitionStates, Collections.emptyList(), "dummyClusterId");

assertTrue(MessageTestUtil.messageSize(builder.build((short) 5).data(), (short) 5) <
MessageTestUtil.messageSize(builder.build((short) 4).data(), (short) 4));
Expand Down
Loading