Skip to content

Commit

Permalink
KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFai…
Browse files Browse the repository at this point in the history
…ledStartupDueToCorruptLogs (apache#12165)

Reviewers: Jason Gustafson <[email protected]>, Luke Chen <[email protected]>
  • Loading branch information
dengziming authored May 17, 2022
1 parent 972b765 commit 5f039ba
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -144,15 +144,15 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
if (quorum == "zk") {
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](quorum)
} else {
if (isKRaftTest()) {
propsToChangeUponRestart.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "1000")
shutdownBroker()
shutdownKRaftController()
verifyCleanShutdownAfterFailedStartup[CancellationException](quorum)
verifyCleanShutdownAfterFailedStartup[CancellationException]
} else {
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50")
propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535")
verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException]
}
}

Expand All @@ -165,7 +165,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
val partitionDir = new File(dirName, s"$topic-0")
partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
}
verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
verifyCleanShutdownAfterFailedStartup[KafkaStorageException]
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
Expand All @@ -177,6 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
verifyNonDaemonThreadsStatus()
}

@Disabled
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("kraft"))
def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {
Expand All @@ -186,7 +187,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
verifyNonDaemonThreadsStatus()
}

private def verifyCleanShutdownAfterFailedStartup[E <: Exception](quorum: String)(implicit exceptionClassTag: ClassTag[E]): Unit = {
private def verifyCleanShutdownAfterFailedStartup[E <: Exception](implicit exceptionClassTag: ClassTag[E]): Unit = {
try {
recreateBroker(startup = true)
fail("Expected KafkaServer setup to fail and throw exception")
Expand All @@ -195,13 +196,24 @@ class ServerShutdownTest extends KafkaServerTestHarness {
// identify the correct exception, making sure the server was shutdown, and cleaning up if anything
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
assertCause(exceptionClassTag.runtimeClass, e)
assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else BrokerState.NOT_RUNNING, brokers.head.brokerState)
} finally {
shutdownBroker()
}
}

private def assertCause(expectedClass: Class[_], e: Throwable): Unit = {
var cause = e
while (cause != null) {
if (expectedClass.isInstance(cause)) {
return
}
cause = cause.getCause
}
fail(s"Failed to assert cause of $e, expected cause $expectedClass")
}

private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = {
!t.isDaemon && t.isAlive && t.getName.startsWith(this.getClass.getName)
}
Expand Down

0 comments on commit 5f039ba

Please sign in to comment.