Skip to content

Commit

Permalink
MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIs…
Browse files Browse the repository at this point in the history
…rPartitions(String).quorum=kraft (apache#12189)

Flaky test as failed in CI https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/

The test fails because it does not wait for metadata to be propagated across brokers before killing a broker which may lead to it getting stale information. Note that a similar test was done in apache#12104 for a different test.

Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
divijvaidya authored May 21, 2022
1 parent 3f86a18 commit f6ba10e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi

try {
killBroker(0)
val aliveServers = brokers.filterNot(_.config.brokerId == 0)

if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(
aliveServers,
controllerServer,
"Timeout waiting for partition metadata propagating to brokers"
)
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
Expand All @@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi

try {
killBroker(0)
val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 5),
s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")
Expand Down Expand Up @@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
killBroker(1)

if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 4),
s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
)
}

val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
val rows = output.split("\n")
Expand Down Expand Up @@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi

try {
killBroker(0)
val aliveServers = brokers.filterNot(_.config.brokerId == 0)

if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(aliveServers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
() => aliveServers.forall(
() => aliveBrokers.forall(
broker =>
broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < 6 &&
broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}

def aliveBrokers: Seq[KafkaBroker] = {
_brokers.filter(broker => alive(broker.config.brokerId)).toSeq
}

def ensureConsistentKRaftMetadata(): Unit = {
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(
brokers,
aliveBrokers,
controllerServer
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}")

if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
}
ensureConsistentKRaftMetadata()

request.data.topicNames.forEach { topic =>
validateTopicIsDeleted(topic)
Expand All @@ -85,9 +83,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}")

if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
}
ensureConsistentKRaftMetadata()

response.data.responses.forEach { response =>
validateTopicIsDeleted(response.name())
Expand Down

0 comments on commit f6ba10e

Please sign in to comment.