From 67d00e25e941f73be8b959c6732ac4db1d1083bf Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 19 May 2022 00:39:26 +0800 Subject: [PATCH] MINOR: Enable some AdminClient integration tests (#12110) Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and `PlaintextAdminIntegrationTest`. There are some tests not enabled or not as expected yet: - testNullConfigs, see KAFKA-13863 - testDescribeCluster and testMetadataRefresh, currently we don't get the real controller in KRaft mode so the test may not run as expected This patch also changes the exception type raised from invalid `IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. When the configuration value type is not a list, we now raise `INVALID_CONFIG` instead of `INVALID_REQUEST`. Reviewers: Luke Chen , Jason Gustafson --- .../kafka/server/ConfigAdminManager.scala | 4 +- ...minClientWithPoliciesIntegrationTest.scala | 16 +- .../api/PlaintextAdminIntegrationTest.scala | 558 ++++++++++-------- 3 files changed, 324 insertions(+), 254 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index e7d6c33ab2638..cc7a98179dd4e 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -494,7 +494,7 @@ object ConfigAdminManager { case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name) case OpType.APPEND => { if (!listType(alterConfigOp.configEntry.name, configKeys)) - throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}") + throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}") val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name)) .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") @@ -505,7 +505,7 @@ object ConfigAdminManager { } case OpType.SUBTRACT => { if (!listType(alterConfigOp.configEntry.name, configKeys)) - throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}") + throw new InvalidConfigurationException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}") val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name)) .orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST))) .getOrElse("") diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index fb1b0d248db13..c9d40cadb0ac5 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -15,18 +15,18 @@ package kafka.api import java.util import java.util.Properties -import java.util.concurrent.ExecutionException import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.{Defaults, KafkaConfig} +import kafka.utils.TestUtils.assertFutureExceptionTypeEquals import kafka.utils.{Logging, TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.policy.AlterConfigPolicy -import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with ).asJava) assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException]) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) // Verify that the second resource was updated and the others were not ensureConsistentKRaftMetadata() @@ -172,10 +172,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with ).asJava, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException]) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true ensureConsistentKRaftMetadata() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 543b3b80cdca3..46cf3c9c4fb13 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -46,7 +46,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{ConsumerGroupState, ElectionType, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource import org.slf4j.LoggerFactory @@ -87,15 +87,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { super.tearDown() } - @Test - def testClose(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testClose(quorum: String): Unit = { val client = Admin.create(createConfig) client.close() client.close() // double close has no effect } - @Test - def testListNodes(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListNodes(quorum: String): Unit = { client = Admin.create(createConfig) val brokerStrs = bootstrapServers().split(",").toList.sorted var nodeStrs: List[String] = null @@ -106,8 +108,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(brokerStrs.mkString(","), nodeStrs.mkString(",")) } - @Test - def testAdminClientHandlingBadIPWithoutTimeout(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAdminClientHandlingBadIPWithoutTimeout(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "1000") val returnBadAddressFirst = new HostResolver { @@ -120,8 +123,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client.describeCluster().nodes().get() } - @Test - def testCreateExistingTopicsThrowTopicExistsException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCreateExistingTopicsThrowTopicExistsException(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "mytopic" val topics = Seq(topic) @@ -130,14 +134,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { client.createTopics(newTopics.asJava).all.get() waitForTopics(client, topics, List()) - val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort)) + val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (brokers.size + 1).toShort)) val e = assertThrows(classOf[ExecutionException], () => client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()) assertTrue(e.getCause.isInstanceOf[TopicExistsException]) } - @Test - def testDeleteTopicsWithIds(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicsWithIds(quorum: String): Unit = { client = Admin.create(createConfig) val topics = Seq("mytopic", "mytopic2", "mytopic3") val newTopics = Seq( @@ -154,15 +159,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { waitForTopics(client, List(), topics) } - @Test - def testMetadataRefresh(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) // KRaft mode will be supported in KAFKA-13910 + def testMetadataRefresh(quorum: String): Unit = { client = Admin.create(createConfig) val topics = Seq("mytopic") val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort)) client.createTopics(newTopics.asJava).all.get() waitForTopics(client, expectedPresent = topics, expectedMissing = List()) - val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get + val controller = brokers.find(_.config.brokerId == brokers.flatMap(_.metadataCache.getControllerId).head).get controller.shutdown() controller.awaitShutdown() val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get() @@ -172,8 +178,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * describe should not auto create topics */ - @Test - def testDescribeNonExistingTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeNonExistingTopic(quorum: String): Unit = { client = Admin.create(createConfig) val existingTopic = "existing-topic" @@ -183,18 +190,23 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val nonExistingTopic = "non-existing" val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).topicNameValues() assertEquals(existingTopic, results.get(existingTopic).get.name) - assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] - assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic)) + assertFutureExceptionTypeEquals(results.get(nonExistingTopic), classOf[UnknownTopicOrPartitionException]) + if (!isKRaftTest()) { + assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic)) + } } - @Test - def testDescribeTopicsWithIds(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicsWithIds(quorum: String): Unit = { client = Admin.create(createConfig) val existingTopic = "existing-topic" client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get() waitForTopics(client, Seq(existingTopic), List()) - val existingTopicId = zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head + ensureConsistentKRaftMetadata() + + val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic) val nonExistingTopicId = Uuid.randomUuid() @@ -203,37 +215,48 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException] } - @Test - def testDescribeCluster(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeCluster(quorum: String): Unit = { client = Admin.create(createConfig) val result = client.describeCluster val nodes = result.nodes.get() val clusterId = result.clusterId().get() - assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId) + assertEquals(brokers.head.dataPlaneRequestProcessor.clusterId, clusterId) val controller = result.controller().get() - assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId. - getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id()) - val brokers = bootstrapServers().split(",") - assertEquals(brokers.size, nodes.size) + + if (isKRaftTest()) { + // In KRaft, we return a random brokerId as the current controller. + val brokerIds = brokers.map(_.config.brokerId).toSet + assertTrue(brokerIds.contains(controller.id)) + } else { + assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId. + getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id) + } + + val brokerEndpoints = bootstrapServers().split(",") + assertEquals(brokerEndpoints.size, nodes.size) for (node <- nodes.asScala) { val hostStr = s"${node.host}:${node.port}" - assertTrue(brokers.contains(hostStr), s"Unknown host:port pair $hostStr in brokerVersionInfos") + assertTrue(brokerEndpoints.contains(hostStr), s"Unknown host:port pair $hostStr in brokerVersionInfos") } } - @Test - def testDescribeLogDirs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeLogDirs(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "topic" val leaderByPartition = createTopic(topic, numPartitions = 10) val partitionsByBroker = leaderByPartition.groupBy { case (_, leaderId) => leaderId }.map { case (k, v) => k -> v.keys.toSeq } - val brokers = (0 until brokerCount).map(Integer.valueOf) - val logDirInfosByBroker = client.describeLogDirs(brokers.asJava).allDescriptions.get + ensureConsistentKRaftMetadata() + val brokerIds = (0 until brokerCount).map(Integer.valueOf) + val logDirInfosByBroker = client.describeLogDirs(brokerIds.asJava).allDescriptions.get (0 until brokerCount).foreach { brokerId => - val server = servers.find(_.config.brokerId == brokerId).get + val server = brokers.find(_.config.brokerId == brokerId).get val expectedPartitions = partitionsByBroker(brokerId) val logDirInfos = logDirInfosByBroker.get(brokerId) val replicaInfos = logDirInfos.asScala.flatMap { case (_, logDirInfo) => @@ -249,36 +272,39 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @Test - def testDescribeReplicaLogDirs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeReplicaLogDirs(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "topic" val leaderByPartition = createTopic(topic, numPartitions = 10) val replicas = leaderByPartition.map { case (partition, brokerId) => new TopicPartitionReplica(topic, partition, brokerId) }.toSeq + ensureConsistentKRaftMetadata() val replicaDirInfos = client.describeReplicaLogDirs(replicas.asJavaCollection).all.get replicaDirInfos.forEach { (topicPartitionReplica, replicaDirInfo) => - val server = servers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get + val server = brokers.find(_.config.brokerId == topicPartitionReplica.brokerId()).get val tp = new TopicPartition(topicPartitionReplica.topic(), topicPartitionReplica.partition()) assertEquals(server.logManager.getLog(tp).get.dir.getParent, replicaDirInfo.getCurrentReplicaLogDir) } } - @Test - def testAlterReplicaLogDirs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterReplicaLogDirs(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "topic" val tp = new TopicPartition(topic, 0) - val randomNums = servers.map(server => server -> Random.nextInt(2)).toMap + val randomNums = brokers.map(server => server -> Random.nextInt(2)).toMap // Generate two mutually exclusive replicaAssignment - val firstReplicaAssignment = servers.map { server => + val firstReplicaAssignment = brokers.map { server => val logDir = new File(server.config.logDirs(randomNums(server))).getAbsolutePath new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir }.toMap - val secondReplicaAssignment = servers.map { server => + val secondReplicaAssignment = brokers.map { server => val logDir = new File(server.config.logDirs(1 - randomNums(server))).getAbsolutePath new TopicPartitionReplica(topic, 0, server.config.brokerId) -> logDir }.toMap @@ -292,14 +318,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } createTopic(topic, replicationFactor = brokerCount) - servers.foreach { server => + ensureConsistentKRaftMetadata() + brokers.foreach { server => val logDir = server.logManager.getLog(tp).get.dir.getParent assertEquals(firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)), logDir) } // Verify that replica can be moved to the specified log directory after the topic has been created client.alterReplicaLogDirs(secondReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get - servers.foreach { server => + brokers.foreach { server => TestUtils.waitUntilTrue(() => { val logDir = server.logManager.getLog(tp).get.dir.getParent secondReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir @@ -332,7 +359,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { try { TestUtils.waitUntilTrue(() => numMessages.get > 10, s"only $numMessages messages are produced before timeout. Producer future ${producerFuture.value}") client.alterReplicaLogDirs(firstReplicaAssignment.asJava, new AlterReplicaLogDirsOptions).all.get - servers.foreach { server => + brokers.foreach { server => TestUtils.waitUntilTrue(() => { val logDir = server.logManager.getLog(tp).get.dir.getParent firstReplicaAssignment(new TopicPartitionReplica(topic, 0, server.config.brokerId)) == logDir @@ -347,7 +374,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val finalNumMessages = Await.result(producerFuture, Duration(20, TimeUnit.SECONDS)) // Verify that all messages that are produced can be consumed - val consumerRecords = TestUtils.consumeTopicRecords(servers, topic, finalNumMessages, + val consumerRecords = TestUtils.consumeTopicRecords(brokers, topic, finalNumMessages, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile) consumerRecords.zipWithIndex.foreach { case (consumerRecord, index) => assertEquals(s"xxxxxxxxxxxxxxxxxxxx-$index", new String(consumerRecord.value)) @@ -697,8 +724,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @Test - def testSeekAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testSeekAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = Admin.create(createConfig) @@ -726,8 +754,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(10L, consumer.position(topicPartition)) } - @Test - def testLogStartOffsetCheckpoint(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testLogStartOffsetCheckpoint(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = Admin.create(createConfig) @@ -765,8 +794,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}") } - @Test - def testLogStartOffsetAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testLogStartOffsetAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = Admin.create(createConfig) @@ -782,25 +812,26 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(3L, lowWatermark) for (i <- 0 until brokerCount) - assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset) + assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset) } - @Test - def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = { val leaders = createTopic(topic, replicationFactor = brokerCount) - val followerIndex = if (leaders(0) != servers(0).config.brokerId) 0 else 1 + val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1 def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = { - TestUtils.waitUntilTrue(() => servers(followerIndex).replicaManager.localLog(topicPartition) != None, + TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition) != None, "Expected follower to create replica for partition") // wait until the follower discovers that log start offset moved beyond its HW TestUtils.waitUntilTrue(() => { - servers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset == expectedStartOffset + brokers(followerIndex).replicaManager.localLog(topicPartition).get.logStartOffset == expectedStartOffset }, s"Expected follower to discover new log start offset $expectedStartOffset") TestUtils.waitUntilTrue(() => { - servers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset == expectedEndOffset + brokers(followerIndex).replicaManager.localLog(topicPartition).get.logEndOffset == expectedEndOffset }, s"Expected follower to catch up to log end offset $expectedEndOffset") } @@ -821,7 +852,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // after the new replica caught up, all replicas should have same log start offset for (i <- 0 until brokerCount) - assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset) + assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset) // kill the same follower again, produce more records, and delete records beyond follower's LOE killBroker(followerIndex) @@ -832,8 +863,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L) } - @Test - def testAlterLogDirsAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterLogDirsAfterDeleteRecords(quorum: String): Unit = { client = Admin.create(createConfig) createTopic(topic, replicationFactor = brokerCount) val expectedLEO = 100 @@ -845,27 +877,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { result.all().get() // make sure we are in the expected state after delete records for (i <- 0 until brokerCount) { - assertEquals(3, servers(i).replicaManager.localLog(topicPartition).get.logStartOffset) - assertEquals(expectedLEO, servers(i).replicaManager.localLog(topicPartition).get.logEndOffset) + assertEquals(3, brokers(i).replicaManager.localLog(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, brokers(i).replicaManager.localLog(topicPartition).get.logEndOffset) } // we will create another dir just for one server - val futureLogDir = servers(0).config.logDirs(1) - val futureReplica = new TopicPartitionReplica(topic, 0, servers(0).config.brokerId) + val futureLogDir = brokers(0).config.logDirs(1) + val futureReplica = new TopicPartitionReplica(topic, 0, brokers(0).config.brokerId) // Verify that replica can be moved to the specified log directory client.alterReplicaLogDirs(Map(futureReplica -> futureLogDir).asJava).all.get TestUtils.waitUntilTrue(() => { - futureLogDir == servers(0).logManager.getLog(topicPartition).get.dir.getParent + futureLogDir == brokers(0).logManager.getLog(topicPartition).get.dir.getParent }, "timed out waiting for replica movement") // once replica moved, its LSO and LEO should match other replicas - assertEquals(3, servers.head.replicaManager.localLog(topicPartition).get.logStartOffset) - assertEquals(expectedLEO, servers.head.replicaManager.localLog(topicPartition).get.logEndOffset) + assertEquals(3, brokers.head.replicaManager.localLog(topicPartition).get.logStartOffset) + assertEquals(expectedLEO, brokers.head.replicaManager.localLog(topicPartition).get.logEndOffset) } - @Test - def testOffsetsForTimesAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testOffsetsForTimesAfterDeleteRecords(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = Admin.create(createConfig) @@ -886,8 +919,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertNull(consumer.offsetsForTimes(Map(topicPartition -> JLong.valueOf(0L)).asJava).get(topicPartition)) } - @Test - def testConsumeAfterDeleteRecords(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testConsumeAfterDeleteRecords(quorum: String): Unit = { val consumer = createConsumer() subscribeAndWaitForAssignment(topic, consumer) @@ -909,8 +943,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.consumeRecords(consumer, 2) } - @Test - def testDeleteRecordsWithException(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteRecordsWithException(quorum: String): Unit = { val consumer = createConsumer() subscribeAndWaitForAssignment(topic, consumer) @@ -934,8 +969,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(classOf[LeaderNotAvailableException], cause.getClass) } - @Test - def testDescribeConfigsForTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeConfigsForTopic(quorum: String): Unit = { createTopic(topic, numPartitions = 2, replicationFactor = brokerCount) client = Admin.create(createConfig) @@ -982,8 +1018,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Also see [[kafka.api.SaslSslAdminIntegrationTest.testAclOperations()]] for tests of ACL operations * when the authorizer is enabled. */ - @Test - def testAclOperations(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAclOperations(quorum: String): Unit = { val acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)) client = Admin.create(createConfig) @@ -998,8 +1035,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Test closing the AdminClient with a generous timeout. Calls in progress should be completed, * since they can be done within the timeout. New calls should receive timeouts. */ - @Test - def testDelayedClose(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDelayedClose(quorum: String): Unit = { client = Admin.create(createConfig) val topics = Seq("mytopic", "mytopic2") val newTopics = topics.map(new NewTopic(_, 1, 1.toShort)) @@ -1015,8 +1053,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Test closing the AdminClient with a timeout of 0, when there are calls with extremely long * timeouts in progress. The calls should be aborted after the hard shutdown timeout elapses. */ - @Test - def testForceClose(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testForceClose(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") client = Admin.create(config) @@ -1032,8 +1071,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * Check that a call with a timeout does not complete before the minimum timeout has elapsed, * even when the default request timeout is shorter. */ - @Test - def testMinimumRequestTimeouts(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testMinimumRequestTimeouts(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}") config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0") @@ -1049,8 +1089,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * Test injecting timeouts for calls that are in flight. */ - @Test - def testCallInFlightTimeouts(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testCallInFlightTimeouts(quorum: String): Unit = { val config = createConfig config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000") config.put(AdminClientConfig.RETRIES_CONFIG, "0") @@ -1068,8 +1109,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { /** * Test the consumer group APIs. */ - @Test - def testConsumerGroups(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testConsumerGroups(quorum: String): Unit = { val config = createConfig client = Admin.create(config) try { @@ -1287,8 +1329,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @Test - def testDeleteConsumerGroupOffsets(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteConsumerGroupOffsets(quorum: String): Unit = { val config = createConfig client = Admin.create(config) try { @@ -1359,8 +1402,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @Test - def testElectPreferredLeaders(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectPreferredLeaders(quorum: String): Unit = { client = Admin.create(createConfig) val prefer0 = Seq(0, 1, 2) @@ -1368,10 +1412,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val prefer2 = Seq(2, 0, 1) val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0) - TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers) + createTopicWithAssignment(partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0)) val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0) - TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers) + createTopicWithAssignment(partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0)) def preferredLeader(topicPartition: TopicPartition): Int = { val partitionMetadata = getTopicMetadata(client, topicPartition.topic).partitions.get(topicPartition.partition) @@ -1380,19 +1424,18 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } /** Changes the preferred leader without changing the current leader. */ - def changePreferredLeader(newAssignment: Seq[Int]) = { + def changePreferredLeader(newAssignment: Seq[Int]): Unit = { val preferred = newAssignment.head - val prior1 = zkClient.getLeaderForPartition(partition1).get - val prior2 = zkClient.getLeaderForPartition(partition2).get - - var m = Map.empty[TopicPartition, Seq[Int]] + val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id() + val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id() + var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]] if (prior1 != preferred) - m += partition1 -> newAssignment + m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) if (prior2 != preferred) - m += partition2 -> newAssignment + m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava)) + client.alterPartitionReassignments(m.asJava).all().get() - zkClient.createPartitionReassignment(m) TestUtils.waitUntilTrue( () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred, s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", @@ -1408,7 +1451,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Noop election var electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava) - var exception = electResult.partitions.get.get(partition1).get + val exception = electResult.partitions.get.get(partition1).get assertEquals(classOf[ElectionNotNeededException], exception.getClass) TestUtils.assertLeader(client, partition1, 0) @@ -1437,13 +1480,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFalse(electResult.partitions.get.get(partition2).isPresent) TestUtils.assertLeader(client, partition2, 1) + def assertUnknownTopicOrPartition( + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { + val exception = result.partitions.get.get(topicPartition).get + assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) + if (isKRaftTest()) { + assertEquals(s"No such topic as ${topicPartition.topic()}", exception.getMessage) + } else { + assertEquals("The partition does not exist.", exception.getMessage) + } + } + // unknown topic val unknownPartition = new TopicPartition("topic-does-not-exist", 0) electResult = client.electLeaders(ElectionType.PREFERRED, Set(unknownPartition).asJava) assertEquals(Set(unknownPartition).asJava, electResult.partitions.get.keySet) - exception = electResult.partitions.get.get(unknownPartition).get - assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) - assertEquals("The partition does not exist.", exception.getMessage) + assertUnknownTopicOrPartition(unknownPartition, electResult) TestUtils.assertLeader(client, partition1, 1) TestUtils.assertLeader(client, partition2, 1) @@ -1455,9 +1509,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get.keySet) TestUtils.assertLeader(client, partition1, 2) TestUtils.assertLeader(client, partition2, 1) - exception = electResult.partitions.get.get(unknownPartition).get - assertEquals(classOf[UnknownTopicOrPartitionException], exception.getClass) - assertEquals("The partition does not exist.", exception.getMessage) + assertUnknownTopicOrPartition(unknownPartition, electResult) // elect preferred leader for partition 2 electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition2).asJava) @@ -1468,41 +1520,48 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Now change the preferred leader to 1 changePreferredLeader(prefer1) // but shut it down... - servers(1).shutdown() + brokers(1).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1)) + def assertPreferredLeaderNotAvailable( + topicPartition: TopicPartition, + result: ElectLeadersResult + ): Unit = { + val exception = result.partitions.get.get(topicPartition).get + assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) + if (isKRaftTest()) { + assertTrue(exception.getMessage.contains( + "The preferred leader was not available."), + s"Unexpected message: ${exception.getMessage}") + } else { + assertTrue(exception.getMessage.contains( + s"Failed to elect leader for partition $topicPartition under strategy PreferredReplicaPartitionLeaderElectionStrategy"), + s"Unexpected message: ${exception.getMessage}") + } + } + // ... now what happens if we try to elect the preferred leader and it's down? val shortTimeout = new ElectLeadersOptions().timeoutMs(10000) electResult = client.electLeaders(ElectionType.PREFERRED, Set(partition1).asJava, shortTimeout) assertEquals(Set(partition1).asJava, electResult.partitions.get.keySet) - exception = electResult.partitions.get.get(partition1).get - assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) - assertTrue(exception.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"), - s"Wrong message ${exception.getMessage}") + + assertPreferredLeaderNotAvailable(partition1, electResult) TestUtils.assertLeader(client, partition1, 2) // preferred leader unavailable with null argument electResult = client.electLeaders(ElectionType.PREFERRED, null, shortTimeout) + assertTrue(Set(partition1, partition2).subsetOf(electResult.partitions.get.keySet.asScala)) - exception = electResult.partitions.get.get(partition1).get - assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) - assertTrue(exception.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"), - s"Wrong message ${exception.getMessage}") - - exception = electResult.partitions.get.get(partition2).get - assertEquals(classOf[PreferredLeaderNotAvailableException], exception.getClass) - assertTrue(exception.getMessage.contains( - "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"), - s"Wrong message ${exception.getMessage}") - + assertPreferredLeaderNotAvailable(partition1, electResult) TestUtils.assertLeader(client, partition1, 2) + + assertPreferredLeaderNotAvailable(partition2, electResult) TestUtils.assertLeader(client, partition2, 2) } - @Test - def testElectUncleanLeadersForOnePartition(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersForOnePartition(quorum: String): Unit = { // Case: unclean leader election with one topic partition client = Admin.create(createConfig) @@ -1511,23 +1570,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val assignment1 = Seq(broker1, broker2) val partition1 = new TopicPartition("unclean-test-topic-1", 0) - TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1), servers) + createTopicWithAssignment(partition1.topic, Map[Int, Seq[Int]](partition1.partition -> assignment1)) TestUtils.assertLeader(client, partition1, broker1) - servers(broker2).shutdown() + brokers(broker2).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertNoLeader(client, partition1) - servers(broker2).startup() + brokers(broker2).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava) assertFalse(electResult.partitions.get.get(partition1).isPresent) TestUtils.assertLeader(client, partition1, broker2) } - @Test - def testElectUncleanLeadersForManyPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersForManyPartitions(quorum: String): Unit = { // Case: unclean leader election with many topic partitions client = Admin.create(createConfig) @@ -1540,22 +1600,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val partition1 = new TopicPartition(topic, 0) val partition2 = new TopicPartition(topic, 1) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(partition1.partition -> assignment1, partition2.partition -> assignment2), - servers + Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) TestUtils.assertLeader(client, partition1, broker1) TestUtils.assertLeader(client, partition2, broker1) - servers(broker2).shutdown() + brokers(broker2).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2)) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertNoLeader(client, partition1) TestUtils.assertNoLeader(client, partition2) - servers(broker2).startup() + brokers(broker2).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava) assertFalse(electResult.partitions.get.get(partition1).isPresent) @@ -1564,8 +1622,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, broker2) } - @Test - def testElectUncleanLeadersForAllPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersForAllPartitions(quorum: String): Unit = { // Case: noop unclean leader election and valid unclean leader election for all partitions client = Admin.create(createConfig) @@ -1579,22 +1638,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val partition1 = new TopicPartition(topic, 0) val partition2 = new TopicPartition(topic, 1) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(partition1.partition -> assignment1, partition2.partition -> assignment2), - servers + Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) TestUtils.assertLeader(client, partition1, broker1) TestUtils.assertLeader(client, partition2, broker1) - servers(broker2).shutdown() + brokers(broker2).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertNoLeader(client, partition1) TestUtils.assertLeader(client, partition2, broker3) - servers(broker2).startup() + brokers(broker2).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, null) assertFalse(electResult.partitions.get.get(partition1).isPresent) @@ -1603,8 +1660,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, broker3) } - @Test - def testElectUncleanLeadersForUnknownPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersForUnknownPartitions(quorum: String): Unit = { // Case: unclean leader election for unknown topic client = Admin.create(createConfig) @@ -1616,11 +1674,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val unknownPartition = new TopicPartition(topic, 1) val unknownTopic = new TopicPartition("unknown-topic", 0) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(0 -> assignment1), - servers + Map(0 -> assignment1) ) TestUtils.assertLeader(client, new TopicPartition(topic, 0), broker1) @@ -1630,8 +1686,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(electResult.partitions.get.get(unknownTopic).get.isInstanceOf[UnknownTopicOrPartitionException]) } - @Test - def testElectUncleanLeadersWhenNoLiveBrokers(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersWhenNoLiveBrokers(quorum: String): Unit = { // Case: unclean leader election with no live brokers client = Admin.create(createConfig) @@ -1642,26 +1699,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val topic = "unclean-test-topic-1" val partition1 = new TopicPartition(topic, 0) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(partition1.partition -> assignment1), - servers + Map(partition1.partition -> assignment1) ) TestUtils.assertLeader(client, partition1, broker1) - servers(broker2).shutdown() + brokers(broker2).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertNoLeader(client, partition1) val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava) assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[EligibleLeadersNotAvailableException]) } - @Test - def testElectUncleanLeadersNoop(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersNoop(quorum: String): Unit = { // Case: noop unclean leader election with explicit topic partitions client = Admin.create(createConfig) @@ -1672,25 +1728,24 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val topic = "unclean-test-topic-1" val partition1 = new TopicPartition(topic, 0) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(partition1.partition -> assignment1), - servers + Map(partition1.partition -> assignment1) ) TestUtils.assertLeader(client, partition1, broker1) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertLeader(client, partition1, broker2) - servers(broker1).startup() + brokers(broker1).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava) assertTrue(electResult.partitions.get.get(partition1).get.isInstanceOf[ElectionNotNeededException]) } - @Test - def testElectUncleanLeadersAndNoop(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testElectUncleanLeadersAndNoop(quorum: String): Unit = { // Case: one noop unclean leader election and one valid unclean leader election client = Admin.create(createConfig) @@ -1704,22 +1759,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val partition1 = new TopicPartition(topic, 0) val partition2 = new TopicPartition(topic, 1) - TestUtils.createTopic( - zkClient, + createTopicWithAssignment( topic, - Map(partition1.partition -> assignment1, partition2.partition -> assignment2), - servers + Map(partition1.partition -> assignment1, partition2.partition -> assignment2) ) TestUtils.assertLeader(client, partition1, broker1) TestUtils.assertLeader(client, partition2, broker1) - servers(broker2).shutdown() + brokers(broker2).shutdown() TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2)) - servers(broker1).shutdown() + brokers(broker1).shutdown() TestUtils.assertNoLeader(client, partition1) TestUtils.assertLeader(client, partition2, broker3) - servers(broker2).startup() + brokers(broker2).startup() val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1, partition2).asJava) assertFalse(electResult.partitions.get.get(partition1).isPresent) @@ -1728,8 +1781,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.assertLeader(client, partition2, broker3) } - @Test - def testListReassignmentsDoesNotShowNonReassigningPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListReassignmentsDoesNotShowNonReassigningPartitions(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -1744,8 +1798,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(0, allReassignmentsMap.size()) } - @Test - def testListReassignmentsDoesNotShowDeletedPartitions(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListReassignmentsDoesNotShowDeletedPartitions(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "list-reassignments-no-reassignments" @@ -1925,8 +1980,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(appendValues, configs.get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp).value) } - @Test - def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncrementalAlterConfigsDeleteAndSetBrokerConfigs(quorum: String): Unit = { client = Admin.create(createConfig) val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(Map(broker0Resource -> @@ -1937,9 +1993,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJavaCollection).asJava).all().get() TestUtils.waitUntilTrue(() => { val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava). - all().get().get(broker0Resource).entries().asScala.map { - case entry => (entry.name, entry.value) - }.toMap + all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) && "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, ""))) }, "Expected to see the broker properties we just set", pause=25) @@ -1953,17 +2007,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJavaCollection).asJava).all().get() TestUtils.waitUntilTrue(() => { val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava). - all().get().get(broker0Resource).entries().asScala.map { - case entry => (entry.name, entry.value) - }.toMap + all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) && "654".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) && "987".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, ""))) }, "Expected to see the broker properties we just modified", pause=25) } - @Test - def testIncrementalAlterConfigsDeleteBrokerConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncrementalAlterConfigsDeleteBrokerConfigs(quorum: String): Unit = { client = Admin.create(createConfig) val broker0Resource = new ConfigResource(ConfigResource.Type.BROKER, "0") client.incrementalAlterConfigs(Map(broker0Resource -> @@ -1976,9 +2029,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJavaCollection).asJava).all().get() TestUtils.waitUntilTrue(() => { val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava). - all().get().get(broker0Resource).entries().asScala.map { - case entry => (entry.name, entry.value) - }.toMap + all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap ("123".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) && "456".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) && "789".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, ""))) @@ -1993,17 +2044,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJavaCollection).asJava).all().get() TestUtils.waitUntilTrue(() => { val broker0Configs = client.describeConfigs(Seq(broker0Resource).asJava). - all().get().get(broker0Resource).entries().asScala.map { - case entry => (entry.name, entry.value) - }.toMap + all().get().get(broker0Resource).entries().asScala.map(entry => (entry.name, entry.value)).toMap ("".equals(broker0Configs.getOrElse(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "")) && "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "")) && "".equals(broker0Configs.getOrElse(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, ""))) }, "Expected to see the broker properties we just removed to be deleted", pause=25) } - @Test - def testInvalidIncrementalAlterConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidIncrementalAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -2015,14 +2065,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val topic2Resource = new ConfigResource(ConfigResource.Type.TOPIC, topic2) createTopic(topic2) - //Add duplicate Keys for topic1 + // Add duplicate Keys for topic1 var topic1AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.75"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.65"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.SET) // valid entry ).asJavaCollection - //Add valid config for topic2 + // Add valid config for topic2 var topic2AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.9"), AlterConfigOp.OpType.SET) ).asJavaCollection @@ -2033,12 +2083,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) - //InvalidRequestException error for topic1 + // InvalidRequestException error for topic1 assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], Some("Error due to duplicate config keys")) - //operation should succeed for topic2 + // Operation should succeed for topic2 alterResult.values().get(topic2Resource).get() + ensureConsistentKRaftMetadata() // Verify that topic1 is not config not updated, and topic2 config is updated val describeResult = client.describeConfigs(Seq(topic1Resource, topic2Resource).asJava) @@ -2046,10 +2097,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(2, configs.size) assertEquals(Defaults.LogCleanerMinCleanRatio.toString, configs.get(topic1Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) - assertEquals(Defaults.CompressionType.toString, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value) + assertEquals(Defaults.CompressionType, configs.get(topic1Resource).get(LogConfig.CompressionTypeProp).value) assertEquals("0.9", configs.get(topic2Resource).get(LogConfig.MinCleanableDirtyRatioProp).value) - //check invalid use of append/subtract operation types + // Check invalid use of append/subtract operation types topic1AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip"), AlterConfigOp.OpType.APPEND) ).asJavaCollection @@ -2064,14 +2115,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJava) assertEquals(Set(topic1Resource, topic2Resource).asJava, alterResult.values.keySet) - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], - Some("Config value append is not allowed for config")) - - assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidRequestException], - Some("Config value subtract is not allowed for config")) + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], + if (isKRaftTest()) { + Some("Can't APPEND to key compression.type because its type is not LIST.") + } else { + Some("Config value append is not allowed for config") + }) + assertFutureExceptionTypeEquals(alterResult.values().get(topic2Resource), classOf[InvalidConfigurationException], + if (isKRaftTest()) { + Some("Can't SUBTRACT to key compression.type because its type is not LIST.") + } else { + Some("Config value subtract is not allowed for config") + }) - //try to add invalid config + // Try to add invalid config topic1AlterConfigs = Seq( new AlterConfigOp(new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), AlterConfigOp.OpType.SET) ).asJavaCollection @@ -2082,11 +2140,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet) assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], - Some("Invalid config value for resource")) + if (isKRaftTest()) { + Some("Invalid value 1.1 for configuration min.cleanable.dirty.ratio: Value must be no more than 1") + } else { + Some("Invalid config value for resource") + }) } - @Test - def testInvalidAlterPartitionReassignments(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidAlterPartitionReassignments(quorum: String): Unit = { client = Admin.create(createConfig) val topic = "alter-reassignments-topic-1" val tp1 = new TopicPartition(topic, 0) @@ -2124,8 +2187,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException]) } - @Test - def testLongTopicNames(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testLongTopicNames(quorum: String): Unit = { val client = Admin.create(createConfig) val longTopicName = String.join("", Collections.nCopies(249, "x")); val invalidTopicName = String.join("", Collections.nCopies(250, "x")); @@ -2137,14 +2201,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(results.containsKey(invalidTopicName)) assertFutureExceptionTypeEquals(results.get(invalidTopicName), classOf[InvalidTopicException]) assertFutureExceptionTypeEquals(client.alterReplicaLogDirs( - Map(new TopicPartitionReplica(longTopicName, 0, 0) -> servers(0).config.logDirs(0)).asJava).all(), + Map(new TopicPartitionReplica(longTopicName, 0, 0) -> brokers(0).config.logDirs(0)).asJava).all(), classOf[InvalidTopicException]) client.close() } // Verify that createTopics and alterConfigs fail with null values - @Test - def testNullConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) + def testNullConfigs(quorum: String): Unit = { def validateLogConfig(compressionType: String): Unit = { val logConfig = zkClient.getLogConfigs(Set(topic), Collections.emptyMap[String, AnyRef])._1(topic) @@ -2182,8 +2247,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { validateLogConfig(compressionType = "producer") } - @Test - def testDescribeConfigsForLog4jLogLevels(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeConfigsForLog4jLogLevels(quorum: String): Unit = { client = Admin.create(createConfig) LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") val loggerConfig = describeBrokerLoggers() @@ -2198,9 +2264,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) } - @Test + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved - def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = { + def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = { client = Admin.create(createConfig) val initialLoggerConfig = describeBrokerLoggers() @@ -2262,9 +2329,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * 4. Change ROOT logger to ERROR * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the curent root logger level) */ - @Test + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved - def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = { + def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(quorum: String): Unit = { client = Admin.create(createConfig) // step 1 - configure root logger val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL @@ -2304,9 +2372,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value()) } - @Test - @Disabled // To be re-enabled once KAFKA-8779 is resolved - def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + @Disabled // Zk to be re-enabled once KAFKA-8779 is resolved + def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { client = Admin.create(createConfig) val deleteRootLoggerEntry = Seq( new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE) @@ -2315,9 +2384,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException]) } - @Test + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) @Disabled // To be re-enabled once KAFKA-8779 is resolved - def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = { + def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(quorum: String): Unit = { client = Admin.create(createConfig) val validLoggerName = "kafka.server.KafkaRequestHandler" val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName) @@ -2359,9 +2429,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { * The AlterConfigs API is deprecated and should not support altering log levels */ @nowarn("cat=deprecation") - @Test - @Disabled // To be re-enabled once KAFKA-8779 is resolved - def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("kraft")) // Zk to be re-enabled once KAFKA-8779 is resolved + def testAlterConfigsForLog4jLogLevelsDoesNotWork(quorum: String): Unit = { client = Admin.create(createConfig) val alterLogLevelsEntries = Seq( @@ -2591,9 +2661,9 @@ object PlaintextAdminIntegrationTest { ).asJava) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) // Verify that first and third resources were not updated and second was updated test.ensureConsistentKRaftMetadata() @@ -2620,9 +2690,9 @@ object PlaintextAdminIntegrationTest { ).asJava, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException]) + assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[InvalidConfigurationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true test.ensureConsistentKRaftMetadata()