From 5f039bae1c74ac465a1a4f47a4721df7f15a4366 Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 17 May 2022 16:31:28 +0800 Subject: [PATCH] KAFKA-13905: Fix failing ServerShutdownTest.testCleanShutdownAfterFailedStartupDueToCorruptLogs (#12165) Reviewers: Jason Gustafson , Luke Chen --- .../kafka/server/ServerShutdownTest.scala | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 0464a340744a9..08c00bcae6d14 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -39,7 +39,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.serialization.{IntegerDeserializer, IntegerSerializer, StringDeserializer, StringSerializer} import org.apache.kafka.common.utils.Time import org.apache.kafka.metadata.BrokerState -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout} +import org.junit.jupiter.api.{BeforeEach, Disabled, Test, TestInfo, Timeout} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -144,15 +144,15 @@ class ServerShutdownTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCleanShutdownAfterFailedStartup(quorum: String): Unit = { - if (quorum == "zk") { - propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50") - propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535") - verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException](quorum) - } else { + if (isKRaftTest()) { propsToChangeUponRestart.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "1000") shutdownBroker() shutdownKRaftController() - verifyCleanShutdownAfterFailedStartup[CancellationException](quorum) + verifyCleanShutdownAfterFailedStartup[CancellationException] + } else { + propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectionTimeoutMsProp, "50") + propsToChangeUponRestart.setProperty(KafkaConfig.ZkConnectProp, "some.invalid.hostname.foo.bar.local:65535") + verifyCleanShutdownAfterFailedStartup[ZooKeeperClientTimeoutException] } } @@ -165,7 +165,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { val partitionDir = new File(dirName, s"$topic-0") partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1)) } - verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum) + verifyCleanShutdownAfterFailedStartup[KafkaStorageException] } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @@ -177,6 +177,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { verifyNonDaemonThreadsStatus() } + @Disabled @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("kraft")) def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = { @@ -186,7 +187,7 @@ class ServerShutdownTest extends KafkaServerTestHarness { verifyNonDaemonThreadsStatus() } - private def verifyCleanShutdownAfterFailedStartup[E <: Exception](quorum: String)(implicit exceptionClassTag: ClassTag[E]): Unit = { + private def verifyCleanShutdownAfterFailedStartup[E <: Exception](implicit exceptionClassTag: ClassTag[E]): Unit = { try { recreateBroker(startup = true) fail("Expected KafkaServer setup to fail and throw exception") @@ -195,13 +196,24 @@ class ServerShutdownTest extends KafkaServerTestHarness { // identify the correct exception, making sure the server was shutdown, and cleaning up if anything // goes wrong so that awaitShutdown doesn't hang case e: Exception => - assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e") - assertEquals(if (quorum == "zk") BrokerState.NOT_RUNNING else BrokerState.SHUTTING_DOWN, brokers.head.brokerState) + assertCause(exceptionClassTag.runtimeClass, e) + assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else BrokerState.NOT_RUNNING, brokers.head.brokerState) } finally { shutdownBroker() } } + private def assertCause(expectedClass: Class[_], e: Throwable): Unit = { + var cause = e + while (cause != null) { + if (expectedClass.isInstance(cause)) { + return + } + cause = cause.getCause + } + fail(s"Failed to assert cause of $e, expected cause $expectedClass") + } + private[this] def isNonDaemonKafkaThread(t: Thread): Boolean = { !t.isDaemon && t.isAlive && t.getName.startsWith(this.getClass.getName) }