From 982b973989ceb35718f507decaef074070ebf782 Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Wed, 25 Dec 2024 10:32:24 +0100 Subject: [PATCH] Clean-up the KraftUtils class Signed-off-by: Jakub Scholz --- .../operator/cluster/model/KRaftUtils.java | 56 --------------- .../operator/cluster/model/KafkaCluster.java | 24 ++++++- .../assembly/KafkaAssemblyOperator.java | 9 --- .../cluster/model/KRaftUtilsTest.java | 71 ------------------- .../cluster/model/KafkaClusterTest.java | 26 ++++++- 5 files changed, 48 insertions(+), 138 deletions(-) delete mode 100644 cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java delete mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java deleted file mode 100644 index 34831d23c0..0000000000 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KRaftUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model; - -import io.strimzi.api.kafka.model.kafka.KafkaSpec; -import io.strimzi.operator.common.model.InvalidResourceException; -import org.apache.kafka.server.common.MetadataVersion; - -import java.util.HashSet; -import java.util.Set; - -/** - * Shared methods for working with KRaft - */ -public class KRaftUtils { - /** - * In KRaft mode, multiple features are currently not supported. This method validates the Kafka CR for the - * unsupported features and if they are used, throws an InvalidResourceException exception. - * - * @param kafkaSpec The .spec section of the Kafka CR which should be checked - */ - public static void validateKafkaCrForKRaft(KafkaSpec kafkaSpec) { - Set errors = new HashSet<>(0); - - if (kafkaSpec == null) { - errors.add("The .spec section of the Kafka custom resource is missing"); - } - - if (!errors.isEmpty()) { - throw new InvalidResourceException("Kafka configuration is not valid: " + errors); - } - } - - /** - * Validates the metadata version - * - * @param metadataVersion Metadata version that should be validated - */ - public static void validateMetadataVersion(String metadataVersion) { - try { - MetadataVersion version = MetadataVersion.fromVersionString(metadataVersion); - - // KRaft is supposed to be supported from metadata version 3.0-IV1. But only from metadata version 3.3-IV0, - // the initial metadata version can be set using the kafka-storage.sh utility. And since most metadata - // versions do not support downgrade, that means 3.3-IV0 is the oldest metadata version that can be used - // with Strimzi. - if (version.isLessThan(MetadataVersion.IBP_3_3_IV0)) { - throw new InvalidResourceException("The oldest supported metadata version is 3.3-IV0"); - } - } catch (IllegalArgumentException e) { - throw new InvalidResourceException("Metadata version " + metadataVersion + " is invalid", e); - } - } -} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 557ed772a8..7860002e46 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -87,6 +87,7 @@ import io.strimzi.operator.common.model.StatusUtils; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import org.apache.kafka.server.common.MetadataVersion; import java.io.IOException; import java.util.ArrayList; @@ -305,7 +306,7 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation, // Validates and sets the metadata version used in KRaft if (versionChange.metadataVersion() != null) { - KRaftUtils.validateMetadataVersion(versionChange.metadataVersion()); + validateMetadataVersion(versionChange.metadataVersion()); result.metadataVersion = versionChange.metadataVersion(); } @@ -544,6 +545,27 @@ public KafkaPool nodePoolForNodeId(int nodeId) { throw new NodePoolNotFoundException("Node ID " + nodeId + " does not belong to any known node pool!"); } + /** + * Validates the metadata version + * + * @param metadataVersion Metadata version that should be validated + */ + /* test */ static void validateMetadataVersion(String metadataVersion) { + try { + MetadataVersion version = MetadataVersion.fromVersionString(metadataVersion); + + // KRaft is supposed to be supported from metadata version 3.0-IV1. But only from metadata version 3.3-IV0, + // the initial metadata version can be set using the kafka-storage.sh utility. And since most metadata + // versions do not support downgrade, that means 3.3-IV0 is the oldest metadata version that can be used + // with Strimzi. + if (version.isLessThan(MetadataVersion.IBP_3_3_IV0)) { + throw new InvalidResourceException("The oldest supported metadata version is 3.3-IV0"); + } + } catch (IllegalArgumentException e) { + throw new InvalidResourceException("Metadata version " + metadataVersion + " is invalid", e); + } + } + /** * Validates the Kafka broker configuration against the configuration options of the desired Kafka version. * diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java index f6b9f37d74..a5bc024902 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperator.java @@ -27,7 +27,6 @@ import io.strimzi.operator.cluster.ClusterOperatorConfig; import io.strimzi.operator.cluster.PlatformFeaturesAvailability; import io.strimzi.operator.cluster.model.ClusterCa; -import io.strimzi.operator.cluster.model.KRaftUtils; import io.strimzi.operator.cluster.model.KafkaCluster; import io.strimzi.operator.cluster.model.KafkaVersionChange; import io.strimzi.operator.cluster.model.ModelUtils; @@ -44,7 +43,6 @@ import io.strimzi.operator.common.Util; import io.strimzi.operator.common.config.ConfigParameter; import io.strimzi.operator.common.model.ClientsCa; -import io.strimzi.operator.common.model.InvalidResourceException; import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.NamespaceAndName; import io.strimzi.operator.common.model.PasswordGenerator; @@ -232,13 +230,6 @@ Future reconcile(ReconciliationState reconcileState) { if (nonMigratedCluster || !kraftEnabled || !nodePoolsEnabled) { throw new InvalidConfigurationException("Strimzi " + OPERATOR_VERSION + " supports only KRaft-based Apache Kafka clusters. Please make sure your cluster is migrated to KRaft before using Strimzi " + OPERATOR_VERSION + "."); - } else { - // Validates features which are currently not supported in KRaft mode - try { - KRaftUtils.validateKafkaCrForKRaft(reconcileState.kafkaAssembly.getSpec()); - } catch (InvalidResourceException e) { - return Future.failedFuture(e); - } } reconcileState.initialStatus() diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java deleted file mode 100644 index c97f98011b..0000000000 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KRaftUtilsTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.model; - -import io.strimzi.api.kafka.model.kafka.KafkaSpec; -import io.strimzi.api.kafka.model.kafka.KafkaSpecBuilder; -import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; -import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; -import io.strimzi.operator.common.model.InvalidResourceException; -import io.strimzi.test.annotations.ParallelSuite; -import io.strimzi.test.annotations.ParallelTest; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; - -@ParallelSuite -public class KRaftUtilsTest { - @ParallelTest - public void testValidKafka() { - KafkaSpec spec = new KafkaSpecBuilder() - .withNewKafka() - .withListeners(new GenericKafkaListenerBuilder() - .withName("listener") - .withPort(9092) - .withTls(true) - .withType(KafkaListenerType.INTERNAL) - .build()) - .withNewKafkaAuthorizationOpa() - .withUrl("http://opa:8080") - .endKafkaAuthorizationOpa() - .endKafka() - .build(); - - assertDoesNotThrow(() -> KRaftUtils.validateKafkaCrForKRaft(spec)); - } - - @ParallelTest - public void testInvalidKafka() { - InvalidResourceException ex = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateKafkaCrForKRaft(null)); - assertThat(ex.getMessage(), is("Kafka configuration is not valid: [The .spec section of the Kafka custom resource is missing]")); - } - - @ParallelTest - public void testKRaftMetadataVersionValidation() { - // Valid values - assertDoesNotThrow(() -> KRaftUtils.validateMetadataVersion("3.6")); - assertDoesNotThrow(() -> KRaftUtils.validateMetadataVersion("3.6-IV2")); - - // Minimum supported versions - assertDoesNotThrow(() -> KRaftUtils.validateMetadataVersion("3.3")); - assertDoesNotThrow(() -> KRaftUtils.validateMetadataVersion("3.3-IV0")); - - // Invalid Values - InvalidResourceException e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateMetadataVersion("3.6-IV9")); - assertThat(e.getMessage(), containsString("Metadata version 3.6-IV9 is invalid")); - - e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateMetadataVersion("3")); - assertThat(e.getMessage(), containsString("Metadata version 3 is invalid")); - - e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateMetadataVersion("3.2")); - assertThat(e.getMessage(), containsString("The oldest supported metadata version is 3.3-IV0")); - - e = assertThrows(InvalidResourceException.class, () -> KRaftUtils.validateMetadataVersion("3.2-IV0")); - assertThat(e.getMessage(), containsString("The oldest supported metadata version is 3.3-IV0")); - } -} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java index 346bf8ddd4..f398011466 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java @@ -2190,7 +2190,31 @@ public void testInvalidVersionWithCustomImage() { } @ParallelTest - public void testKRaftMetadataVersionValidation() { + public void testKRaftMetadataVersionValidation() { + // Valid values + assertDoesNotThrow(() -> KafkaCluster.validateMetadataVersion("3.6")); + assertDoesNotThrow(() -> KafkaCluster.validateMetadataVersion("3.6-IV2")); + + // Minimum supported versions + assertDoesNotThrow(() -> KafkaCluster.validateMetadataVersion("3.3")); + assertDoesNotThrow(() -> KafkaCluster.validateMetadataVersion("3.3-IV0")); + + // Invalid Values + InvalidResourceException e = assertThrows(InvalidResourceException.class, () -> KafkaCluster.validateMetadataVersion("3.6-IV9")); + assertThat(e.getMessage(), containsString("Metadata version 3.6-IV9 is invalid")); + + e = assertThrows(InvalidResourceException.class, () -> KafkaCluster.validateMetadataVersion("3")); + assertThat(e.getMessage(), containsString("Metadata version 3 is invalid")); + + e = assertThrows(InvalidResourceException.class, () -> KafkaCluster.validateMetadataVersion("3.2")); + assertThat(e.getMessage(), containsString("The oldest supported metadata version is 3.3-IV0")); + + e = assertThrows(InvalidResourceException.class, () -> KafkaCluster.validateMetadataVersion("3.2-IV0")); + assertThat(e.getMessage(), containsString("The oldest supported metadata version is 3.3-IV0")); + } + + @ParallelTest + public void testKRaftMetadataVersionValidationInFromCrd() { Kafka kafka = new KafkaBuilder(KAFKA) .editSpec() .editKafka()