Skip to content

Commit

Permalink
Clean-up the KafkaClusterCreator class after ZooKeeper removal (#10997
Browse files Browse the repository at this point in the history
)

Signed-off-by: Jakub Scholz <[email protected]>
  • Loading branch information
scholzj authored Dec 29, 2024
1 parent 32c0a3e commit abed0e5
Showing 1 changed file with 28 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
Expand Down Expand Up @@ -120,9 +119,9 @@ public Future<KafkaCluster> prepareKafkaCluster(
this.scalingDownBlockedNodes.addAll(kafka.removedNodes());
// We have a failure, and should try to fix issues
// Once we fix it, we call this method again, but this time with tryToFixProblems set to false
return revertScaleDown(kafka, kafkaCr, nodePools)
.compose(kafkaAndNodePools -> revertRoleChange(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs()))
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs(), oldStorage, versionChange, kafkaStatus, false));
return revertScaleDown(nodePools)
.compose(revertedNodePools -> revertRoleChange(revertedNodePools))
.compose(revertedNodePools -> prepareKafkaCluster(kafkaCr, revertedNodePools, oldStorage, versionChange, kafkaStatus, false));
} else if (checkFailed()) {
// We have a failure, but we should not try to fix it
List<String> errors = new ArrayList<>();
Expand Down Expand Up @@ -212,70 +211,49 @@ private Future<KafkaCluster> brokerRemovalCheck(Kafka kafkaCr, KafkaCluster kafk
/**
* Reverts the broker scale down if it is not allowed because the brokers are not empty
*
* @param kafka Instance of the Kafka cluster model that contains information needed to revert the changes
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List with KafkaNodePool custom resources
*
* @return Future with KafkaAndNodePools record containing the fixed Kafka and KafkaNodePool CRs
*/
@SuppressWarnings("deprecation") // Replicas in Kafka CR are deprecated and its use here will be removed in the future in a separate PR
private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) {
private Future<List<KafkaNodePool>> revertScaleDown(List<KafkaNodePool> nodePoolCrs) {
if (scaleDownCheckFailed) {
if (nodePoolCrs == null || nodePoolCrs.isEmpty()) {
// There are no node pools => the Kafka CR is used
int newReplicasCount = kafkaCr.getSpec().getKafka().getReplicas() + kafka.removedNodes().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of Kafka " + kafkaCr.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of Kafka {} by changing number of replicas to {}", kafkaCr.getMetadata().getName(), newReplicasCount);

Kafka newKafkaCr = new KafkaBuilder(kafkaCr)
.editSpec()
.editKafka()
.withReplicas(newReplicasCount)
.endKafka()
.endSpec()
.build();

return Future.succeededFuture(new KafkaAndNodePools(newKafkaCr, nodePoolCrs));
} else {
// Node pools are used -> we have to fix scale down in the KafkaNodePools
List<KafkaNodePool> newNodePools = new ArrayList<>();
// Node pools are used -> we have to fix scale down in the KafkaNodePools
List<KafkaNodePool> newNodePools = new ArrayList<>();

for (KafkaNodePool nodePool : nodePoolCrs) {
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of KafkaNodePool " + nodePool.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of KafkaNodePool {} by changing number of replicas to {}", nodePool.getMetadata().getName(), newReplicasCount);
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.withReplicas(newReplicasCount)
.endSpec()
.build());
} else {
newNodePools.add(nodePool);
}
for (KafkaNodePool nodePool : nodePoolCrs) {
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of KafkaNodePool " + nodePool.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of KafkaNodePool {} by changing number of replicas to {}", nodePool.getMetadata().getName(), newReplicasCount);
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.withReplicas(newReplicasCount)
.endSpec()
.build());
} else {
newNodePools.add(nodePool);
}

return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, newNodePools));
}

return Future.succeededFuture(newNodePools);
} else {
// The scale-down check did not fail => return the original resources
return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, nodePoolCrs));
return Future.succeededFuture(nodePoolCrs);
}
}

/**
* Reverts the role change when the broker role is removed from a node that has still assigned partition replicas
*
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List with KafkaNodePool custom resources
*
* @return Future with KafkaAndNodePools record containing the fixed Kafka and KafkaNodePool CRs
*/
private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) {
private Future<List<KafkaNodePool>> revertRoleChange(List<KafkaNodePool> nodePoolCrs) {
if (usedToBeBrokersCheckFailed) {
List<KafkaNodePool> newNodePools = new ArrayList<>();

Expand All @@ -296,10 +274,10 @@ private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNode
}
}

return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, newNodePools));
return Future.succeededFuture(newNodePools);
} else {
// The used-to-be-brokers check did not fail => return the original resources
return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, nodePoolCrs));
return Future.succeededFuture(nodePoolCrs);
}
}

Expand Down Expand Up @@ -353,12 +331,4 @@ public static KafkaCluster createKafkaCluster(
String clusterId = NodePoolUtils.getOrGenerateKRaftClusterId(kafkaCr, nodePoolCrs);
return KafkaCluster.fromCrd(reconciliation, kafkaCr, pools, versions, versionChange, clusterId, sharedEnvironmentProvider);
}

/**
* Utility record to pass fixed custom resources between methods
*
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List of KafkaNodePool resources
*/
record KafkaAndNodePools(Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) { }
}

0 comments on commit abed0e5

Please sign in to comment.