diff --git a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala index 26c60e1c3e422..3082babd06fa0 100644 --- a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala @@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - if (isKRaftTest()) { - TestUtils.ensureConsistentKRaftMetadata( - aliveServers, - controllerServer, - "Timeout waiting for partition metadata propagating to brokers" - ) + ensureConsistentKRaftMetadata() } else { - TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) + TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0) } val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) @@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0) + if (isKRaftTest()) { + ensureConsistentKRaftMetadata() + } else { + TestUtils.waitUntilTrue( + () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 5), + s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic" + ) + } val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions")))) val rows = output.split("\n") @@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) killBroker(1) + + if (isKRaftTest()) { + ensureConsistentKRaftMetadata() + } else { + TestUtils.waitUntilTrue( + () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 4), + s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic" + ) + } + val output = TestUtils.grabConsoleOutput( topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions")))) val rows = output.split("\n") @@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi try { killBroker(0) - val aliveServers = brokers.filterNot(_.config.brokerId == 0) - if (isKRaftTest()) { - TestUtils.ensureConsistentKRaftMetadata(aliveServers, controllerServer, "Timeout waiting for topic configs propagating to brokers") + ensureConsistentKRaftMetadata() } else { TestUtils.waitUntilTrue( - () => aliveServers.forall( + () => aliveBrokers.forall( broker => broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < 6 && broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID), diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index f46713337a7e6..26f4c9d4c2ecf 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -353,10 +353,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } } + def aliveBrokers: Seq[KafkaBroker] = { + _brokers.filter(broker => alive(broker.config.brokerId)).toSeq + } + def ensureConsistentKRaftMetadata(): Unit = { if (isKRaftTest()) { TestUtils.ensureConsistentKRaftMetadata( - brokers, + aliveBrokers, controllerServer ) } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 9137558437b3a..644f21ff3f648 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -71,9 +71,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}") - if (isKRaftTest()) { - TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer) - } + ensureConsistentKRaftMetadata() request.data.topicNames.forEach { topic => validateTopicIsDeleted(topic) @@ -85,9 +83,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}") - if (isKRaftTest()) { - TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer) - } + ensureConsistentKRaftMetadata() response.data.responses.forEach { response => validateTopicIsDeleted(response.name())