diff --git a/.azure/build-pipeline.yaml b/.azure/build-pipeline.yaml index 1c4a0bf36c..f374b6eeda 100644 --- a/.azure/build-pipeline.yaml +++ b/.azure/build-pipeline.yaml @@ -31,6 +31,8 @@ stages: artifactPipeline: '' artifactRunVersion: '' artifactRunId: '' + variables: + STRIMZI_TEST_CONTAINER_LOGGING_ENABLED: false # Builds Strimzi docs - stage: build_docs diff --git a/pom.xml b/pom.xml index f6fe900683..19e47a34cf 100644 --- a/pom.xml +++ b/pom.xml @@ -162,7 +162,7 @@ 1.3.2 1.2.0 5.8.2 - 0.109.0 + 0.109.1 5.13.2 3.14.7 1.1 @@ -185,10 +185,6 @@ ossrh https://oss.sonatype.org/content/repositories/snapshots - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index 30ce19eef3..c9f8463e9e 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -106,7 +106,15 @@ class TopicControllerIT implements TestSeparator { private static final Logger LOGGER = LogManager.getLogger(TopicControllerIT.class); private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicControllerIT.class); - public static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); + private static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); + private static final Map TEST_TOPIC_CONFIG = Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), + TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", + TopicConfig.FLUSH_MS_CONFIG, 1234L, + TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, + TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, + TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true + ); static KubernetesClient kubernetesClient; TopicOperatorMain operator; @@ -437,21 +445,14 @@ static List managedKafkaTopics() { static List managedKafkaTopicsWithConfigs() { var topicName = "topic" + System.nanoTime(); - var configs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, 1234L, // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true // boolean typed - ); + return List.of( - kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, configs) + kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG) ); } @@ -851,14 +852,9 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster UnaryOperator> expectedChangedConfigs) throws ExecutionException, InterruptedException, TimeoutException { // given var expectedTopicName = TopicOperatorUtil.topicName(kt); - var expectedCreateConfigs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, "compact", // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, "1234", // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1234", // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.6", // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true" // boolean typed - ); + var expectedCreateConfigs = TEST_TOPIC_CONFIG.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() instanceof List + ? String.join(",", (List) e.getValue()) : String.valueOf(e.getValue()))); Map expectedConfigs = expectedChangedConfigs.apply(expectedCreateConfigs); assertNotEquals(expectedCreateConfigs, expectedConfigs); @@ -881,7 +877,7 @@ public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws Execu shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, TopicControllerIT::setSnappyCompression, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return expectedUpdatedConfigs; }); @@ -900,7 +896,7 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678"); return expectedUpdatedConfigs; }); @@ -919,7 +915,7 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.FLUSH_MS_CONFIG, "9876"); return expectedUpdatedConfigs; }); @@ -938,7 +934,7 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1"); return expectedUpdatedConfigs; }); @@ -957,7 +953,7 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); return expectedUpdatedConfigs; }); @@ -976,7 +972,7 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + Map expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete"); return expectedUpdatedConfigs; }); @@ -995,7 +991,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx return theKt; }, expectedCreateConfigs -> { - var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); return expectedUpdatedConfigs; }); @@ -1218,7 +1214,7 @@ public void shouldDeleteTopicFromKafkaWhenManagedTopicDeletedFromKube() throws E TopicOperatorTestUtil.waitUntilCondition(resource, Objects::isNull); // then - assertNotExistsInKafka(TopicOperatorUtil.topicName(kt)); + waitNotExistsInKafka(TopicOperatorUtil.topicName(kt)); } }