Skip to content

Commit

Permalink
Add support for KRaft in KafkaRoller (strimzi#9146)
Browse files Browse the repository at this point in the history
Signed-off-by: Gantigmaa Selenge <[email protected]>
Signed-off-by: Katherine Stanley <[email protected]>
Co-authored-by: Katherine Stanley <[email protected]>
  • Loading branch information
tinaselenge and katheris authored Nov 27, 2023
1 parent c7c2f89 commit adbdd7b
Show file tree
Hide file tree
Showing 8 changed files with 874 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ protected Future<Void> manualRollingUpdate() {

return RestartReasons.of(RestartReason.MANUAL_ROLLING_UPDATE);
},
Map.of(),
Map.of(),
// Pass empty advertised hostnames and ports for the nodes
nodes.stream().collect(Collectors.toMap(NodeRef::nodeId, node -> Map.of())),
nodes.stream().collect(Collectors.toMap(NodeRef::nodeId, node -> Map.of())),
false
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.strimzi.kafka.config.model.Scope;
import io.strimzi.operator.cluster.model.KafkaConfiguration;
import io.strimzi.operator.cluster.model.KafkaVersion;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.OrderedProperties;
Expand All @@ -41,7 +42,7 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
private static final String PLACE_HOLDER = Pattern.quote("STRIMZI_BROKER_ID");

private final Reconciliation reconciliation;
private final Collection<AlterConfigOp> diff;
private final Collection<AlterConfigOp> brokerConfigDiff;
private final Map<String, ConfigModel> configModel;

/**
Expand All @@ -65,19 +66,23 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
+ "|zookeeper\\.clientCnxnSocket"
+ "|broker\\.rack)$");

/**
* KRaft controller configuration options are skipped if it is not combined node
*/
private static final Pattern IGNORABLE_CONTROLLER_PROPERTIES = Pattern.compile("controller\\.quorum\\..*");
/**
* Constructor
*
* @param reconciliation Reconciliation marker
* @param brokerConfigs Broker configuration from Kafka Admin API
* @param desired Desired configuration
* @param kafkaVersion Kafka version
* @param brokerId Broker ID
* @param brokerNodeRef Broker node reference
*/
protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, int brokerId) {
protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef) {
this.reconciliation = reconciliation;
this.configModel = KafkaConfiguration.readConfigModel(kafkaVersion);
this.diff = diff(brokerId, desired, brokerConfigs, configModel);
this.brokerConfigDiff = diff(brokerNodeRef, desired, brokerConfigs, configModel);
}

private static void fillPlaceholderValue(Map<String, String> orderedProperties, String value) {
Expand All @@ -92,7 +97,7 @@ private static void fillPlaceholderValue(Map<String, String> orderedProperties,
*/
protected boolean canBeUpdatedDynamically() {
boolean result = true;
for (AlterConfigOp entry : diff) {
for (AlterConfigOp entry : brokerConfigDiff) {
if (isEntryReadOnly(entry.configEntry())) {
result = false;
LOGGER.infoCr(reconciliation, "Configuration can't be updated dynamically due to: {}", entry);
Expand All @@ -115,29 +120,34 @@ private boolean isEntryReadOnly(ConfigEntry entry) {
* @return Collection of AlterConfigOp containing difference between current and desired configuration
*/
protected Collection<AlterConfigOp> getConfigDiff() {
return diff;
return brokerConfigDiff;
}

/**
* @return The number of broker configs which are different.
*/
protected int getDiffSize() {
return diff.size();
return brokerConfigDiff.size();
}

private static boolean isIgnorableProperty(String key) {
return IGNORABLE_PROPERTIES.matcher(key).matches();
private static boolean isIgnorableProperty(final String key, final boolean nodeIsController) {
// If node is not a KRaft controller, ignore KRaft controller config properties.
if (!nodeIsController) {
return IGNORABLE_PROPERTIES.matcher(key).matches() || IGNORABLE_CONTROLLER_PROPERTIES.matcher(key).matches();
} else {
return IGNORABLE_PROPERTIES.matcher(key).matches();
}
}

/**
* Computes diff between two maps. Entries in IGNORABLE_PROPERTIES are skipped
* @param brokerId id of compared broker
* @param brokerNodeRef broker node reference of compared broker
* @param desired desired configuration, may be null if the related ConfigMap does not exist yet or no changes are required
* @param brokerConfigs current configuration
* @param configModel default configuration for {@code kafkaVersion} of broker
* @return Collection of AlterConfigOp containing all entries which were changed from current in desired configuration
*/
private Collection<AlterConfigOp> diff(int brokerId, String desired,
private Collection<AlterConfigOp> diff(NodeRef brokerNodeRef, String desired,
Config brokerConfigs,
Map<String, ConfigModel> configModel) {
if (brokerConfigs == null || desired == null) {
Expand All @@ -156,7 +166,7 @@ private Collection<AlterConfigOp> diff(int brokerId, String desired,
orderedProperties.addStringPairs(desired);
Map<String, String> desiredMap = orderedProperties.asMap();

fillPlaceholderValue(desiredMap, Integer.toString(brokerId));
fillPlaceholderValue(desiredMap, Integer.toString(brokerNodeRef.nodeId()));

JsonNode source = PATCH_MAPPER.valueToTree(currentMap);
JsonNode target = PATCH_MAPPER.valueToTree(desiredMap);
Expand All @@ -174,25 +184,25 @@ private Collection<AlterConfigOp> diff(int brokerId, String desired,
if (optEntry.isPresent()) {
ConfigEntry entry = optEntry.get();
if ("remove".equals(op)) {
removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry);
removeProperty(configModel, updatedCE, pathValueWithoutSlash, entry, brokerNodeRef.controller());
} else if ("replace".equals(op)) {
// entry is in the current, desired is updated value
updateOrAdd(entry.name(), configModel, desiredMap, updatedCE);
updateOrAdd(entry.name(), configModel, desiredMap, updatedCE, brokerNodeRef.controller());
}
} else {
if ("add".equals(op)) {
// entry is not in the current, it is added
updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE);
updateOrAdd(pathValueWithoutSlash, configModel, desiredMap, updatedCE, brokerNodeRef.controller());
}
}

if ("remove".equals(op)) {
// there is a lot of properties set by default - not having them in desired causes very noisy log output
LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerId, d);
LOGGER.traceCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
LOGGER.traceCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.traceCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
} else {
LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerId, d);
LOGGER.debugCr(reconciliation, "Kafka Broker {} Config Differs : {}", brokerNodeRef.nodeId(), d);
LOGGER.debugCr(reconciliation, "Current Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(source, pathValue));
LOGGER.debugCr(reconciliation, "Desired Kafka Broker Config path {} has value {}", pathValueWithoutSlash, lookupPath(target, pathValue));
}
Expand All @@ -201,8 +211,8 @@ private Collection<AlterConfigOp> diff(int brokerId, String desired,
return updatedCE;
}

private void updateOrAdd(String propertyName, Map<String, ConfigModel> configModel, Map<String, String> desiredMap, Collection<AlterConfigOp> updatedCE) {
if (!isIgnorableProperty(propertyName)) {
private void updateOrAdd(String propertyName, Map<String, ConfigModel> configModel, Map<String, String> desiredMap, Collection<AlterConfigOp> updatedCE, boolean nodeIsController) {
if (!isIgnorableProperty(propertyName, nodeIsController)) {
if (isCustomEntry(propertyName, configModel)) {
LOGGER.traceCr(reconciliation, "custom property {} has been updated/added {}", propertyName, desiredMap.get(propertyName));
} else {
Expand All @@ -214,7 +224,7 @@ private void updateOrAdd(String propertyName, Map<String, ConfigModel> configMod
}
}

private void removeProperty(Map<String, ConfigModel> configModel, Collection<AlterConfigOp> updatedCE, String pathValueWithoutSlash, ConfigEntry entry) {
private void removeProperty(Map<String, ConfigModel> configModel, Collection<AlterConfigOp> updatedCE, String pathValueWithoutSlash, ConfigEntry entry, boolean nodeIsController) {
if (isCustomEntry(entry.name(), configModel)) {
// we are deleting custom option
LOGGER.traceCr(reconciliation, "removing custom property {}", entry.name());
Expand All @@ -227,7 +237,7 @@ private void removeProperty(Map<String, ConfigModel> configModel, Collection<Alt
} else {
// entry is in current, is not in desired, is not default -> it was using non-default value and was removed
// if the entry was custom, it should be deleted
if (!isIgnorableProperty(pathValueWithoutSlash)) {
if (!isIgnorableProperty(pathValueWithoutSlash, nodeIsController)) {
updatedCE.add(new AlterConfigOp(new ConfigEntry(pathValueWithoutSlash, null), AlterConfigOp.OpType.DELETE));
LOGGER.infoCr(reconciliation, "{} not set in desired, unsetting back to default {}", entry.name(), "deleted entry");
} else {
Expand All @@ -241,7 +251,7 @@ private void removeProperty(Map<String, ConfigModel> configModel, Collection<Alt
*/
@Override
public boolean isEmpty() {
return diff.size() == 0;
return brokerConfigDiff.size() == 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.resource;

import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.VertxUtil;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.QuorumInfo;

/**
* Provides methods to determine whether it's safe to restart a KRaft controller and identify the quorum leader id.
* Restarting a KRaft controller is considered safe if the majority of controllers, excluding the one being
* considered for restart, have caught up with the quorum leader within the specified timeout period defined by
* controller.quorum.fetch.timeout.ms.
*/
class KafkaQuorumCheck {

private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaQuorumCheck.class.getName());
private final Reconciliation reconciliation;
private final Admin admin;
private final Vertx vertx;
private final long controllerQuorumFetchTimeoutMs;

protected KafkaQuorumCheck(Reconciliation reconciliation, Admin ac, Vertx vertx, long controllerQuorumFetchTimeoutMs) {
this.reconciliation = reconciliation;
this.admin = ac;
this.vertx = vertx;
this.controllerQuorumFetchTimeoutMs = controllerQuorumFetchTimeoutMs;
}

/**
* Returns future that completes with true if the given controller can be rolled based on the quorum state. Quorum is considered
* healthy if the majority of controllers, excluding the given node, have caught up with the quorum leader within the
* controller.quorum.fetch.timeout.ms.
*/
Future<Boolean> canRollController(int nodeId) {
LOGGER.debugCr(reconciliation, "Determining whether controller pod {} can be rolled", nodeId);
return describeMetadataQuorum().map(info -> {
boolean canRoll = isQuorumHealthyWithoutNode(nodeId, info);
if (!canRoll) {
LOGGER.debugCr(reconciliation, "Not restarting controller pod {}. Restart would affect the quorum health", nodeId);
}
return canRoll;
}).recover(error -> {
LOGGER.warnCr(reconciliation, "Error determining whether it is safe to restart controller pod {}", nodeId, error);
return Future.failedFuture(error);
});
}

/**
* Returns id of the quorum leader.
**/
Future<Integer> quorumLeaderId() {
LOGGER.debugCr(reconciliation, "Determining the controller quorum leader id");
return describeMetadataQuorum().map(QuorumInfo::leaderId).recover(error -> {
LOGGER.warnCr(reconciliation, "Error determining the controller quorum leader id", error);
return Future.failedFuture(error);
});
}

/**
* Returns true if the majority of the controllers' lastCaughtUpTimestamps are within
* the controller.quorum.fetch.timeout.ms based on the given quorum info.
* The given nodeIdToRestart is the one being considered to restart, therefore excluded from the check.
**/
private boolean isQuorumHealthyWithoutNode(int nodeIdToRestart, QuorumInfo info) {
int leaderId = info.leaderId();
if (leaderId < 0) {
LOGGER.warnCr(reconciliation, "No controller quorum leader is found because the leader id is set to {}", leaderId);
return false;
}

Map<Integer, Long> controllerStates = info.voters().stream().collect(Collectors.toMap(
QuorumInfo.ReplicaState::replicaId,
state -> state.lastCaughtUpTimestamp().isPresent() ? state.lastCaughtUpTimestamp().getAsLong() : -1));
int totalNumOfControllers = controllerStates.size();

if (totalNumOfControllers == 1) {
LOGGER.warnCr(reconciliation, "Performing rolling update on a controller quorum with a single node. This may result in data loss " +
"or may cause disruption to the cluster during the rolling update. It is recommended that a minimum of three controllers are used.");
return true;
}

long leaderLastCaughtUpTimestamp = controllerStates.get(leaderId);
LOGGER.debugCr(reconciliation, "The lastCaughtUpTimestamp for the controller quorum leader (node id {}) is {}", leaderId, leaderLastCaughtUpTimestamp);

long numOfCaughtUpControllers = controllerStates.entrySet().stream().filter(entry -> {
int controllerNodeId = entry.getKey();
long lastCaughtUpTimestamp = entry.getValue();
if (lastCaughtUpTimestamp < 0) {
LOGGER.warnCr(reconciliation, "No valid lastCaughtUpTimestamp is found for controller {} ", controllerNodeId);
} else {
LOGGER.debugCr(reconciliation, "The lastCaughtUpTimestamp for controller {} is {}", controllerNodeId, lastCaughtUpTimestamp);
if (controllerNodeId == leaderId || (leaderLastCaughtUpTimestamp - lastCaughtUpTimestamp) < controllerQuorumFetchTimeoutMs) {

// skip the controller that we are considering to roll
if (controllerNodeId != nodeIdToRestart) {
return true;
}
LOGGER.debugCr(reconciliation, "Controller {} has caught up with the controller quorum leader", controllerNodeId);
} else {
LOGGER.debugCr(reconciliation, "Controller {} has fallen behind the controller quorum leader", controllerNodeId);
}
}
return false;
}).count();

LOGGER.debugCr(reconciliation, "Out of {} controllers, there are {} that have caught up with the controller quorum leader, not including controller {}", totalNumOfControllers, numOfCaughtUpControllers, nodeIdToRestart);

if (totalNumOfControllers == 2) {

// Only roll the controller if the other one in the quorum has caught up or is the active controller.
if (numOfCaughtUpControllers == 1) {
LOGGER.warnCr(reconciliation, "Performing rolling update on a controller quorum with 2 nodes. This may result in data loss " +
"or cause disruption to the cluster during the rolling update. It is recommended that a minimum of three controllers are used.");
return true;
} else {
return false;
}
} else {
return numOfCaughtUpControllers >= (totalNumOfControllers + 2) / 2;
}
}

private Future<QuorumInfo> describeMetadataQuorum() {
return VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, admin.describeMetadataQuorum().quorumInfo());
}
}
Loading

0 comments on commit adbdd7b

Please sign in to comment.