diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 2852cd141febd..cb6796a09d5ff 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -407,7 +407,7 @@ class ZkAdminManager(val config: KafkaConfig, case e @ (_: ConfigException | _: IllegalArgumentException) => val message = s"Invalid config value for resource $resource: ${e.getMessage}" info(message) - resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) + resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e)) case e: Throwable => val configProps = new Properties config.entries.asScala.filter(_.value != null).foreach { configEntry => @@ -427,6 +427,10 @@ class ZkAdminManager(val config: KafkaConfig, private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { val topic = resource.name + if (topic.isEmpty()) { + throw new InvalidRequestException("Default topic resources are not allowed.") + } + if (!metadataCache.contains(topic)) throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") @@ -489,6 +493,9 @@ class ZkAdminManager(val config: KafkaConfig, resource.`type` match { case ConfigResource.Type.TOPIC => + if (resource.name.isEmpty()) { + throw new InvalidRequestException("Default topic resources are not allowed.") + } val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name) prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys) alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap) @@ -511,7 +518,7 @@ class ZkAdminManager(val config: KafkaConfig, case e @ (_: ConfigException | _: IllegalArgumentException) => val message = s"Invalid config value for resource $resource: ${e.getMessage}" info(message) - resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) + resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e)) case e: Throwable => // Log client errors at a lower level than unexpected exceptions val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps" diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index ab75dc31fb37f..fb1b0d248db13 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -16,17 +16,20 @@ package kafka.api import java.util import java.util.Properties import java.util.concurrent.ExecutionException + import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.{Defaults, KafkaConfig} -import kafka.utils.{Logging, TestUtils} +import kafka.utils.{Logging, TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry} import org.apache.kafka.common.config.{ConfigResource, TopicConfig} -import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException} +import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.policy.AlterConfigPolicy import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout} +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -45,7 +48,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with @BeforeEach override def setUp(testInfo: TestInfo): Unit = { super.setUp(testInfo) - TestUtils.waitUntilBrokerMetadataIsPropagated(servers) + TestUtils.waitUntilBrokerMetadataIsPropagated(brokers) } @AfterEach @@ -58,14 +61,25 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with def createConfig: util.Map[String, Object] = Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()).asJava - override def generateConfigs = { - val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect) - configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy])) + override def generateConfigs: collection.Seq[KafkaConfig] = { + val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull) + configs.foreach(overrideNodeConfigs) configs.map(KafkaConfig.fromProps) } - @Test - def testValidAlterConfigs(): Unit = { + override def kraftControllerConfigs(): Seq[Properties] = { + val props = new Properties() + overrideNodeConfigs(props) + Seq(props) + } + + private def overrideNodeConfigs(props: Properties): Unit = { + props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testValidAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics val topic1 = "describe-alter-configs-topic-1" @@ -79,18 +93,20 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) createTopic(topic2, 1, 1) - PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, topicResource1, topicResource2) + PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, topicResource1, topicResource2) } - @Test - def testInvalidAlterConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) - PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client) + PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client) } @nowarn("cat=deprecation") - @Test - def testInvalidAlterConfigsDueToPolicy(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -115,7 +131,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava - val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString) val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava // Alter configs: second is valid, the others are invalid @@ -129,10 +145,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException]) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that the second resource was updated and the others were not + ensureConsistentKRaftMetadata() var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) var configs = describeResult.all.get assertEquals(4, configs.size) @@ -157,10 +174,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) alterResult.values.get(topicResource2).get - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException]) assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true + ensureConsistentKRaftMetadata() describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) configs = describeResult.all.get assertEquals(4, configs.size) @@ -173,7 +191,6 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value) } - } object AdminClientWithPoliciesIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d6aa7a7e9a21e..543b3b80cdca3 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -25,12 +25,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.{Collections, Optional, Properties} import java.{time, util} + +import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.security.authorizer.AclEntry -import kafka.server.{Defaults, DynamicConfig, KafkaConfig, KafkaServer} +import kafka.server.{Defaults, DynamicConfig, KafkaConfig} import kafka.utils.TestUtils._ import kafka.utils.{Log4jController, TestInfoUtils, TestUtils} -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.HostResolver import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource @@ -353,8 +354,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } - @Test - def testDescribeAndAlterConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeAndAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) // Create topics @@ -370,8 +372,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { createTopic(topic2) // Describe topics and broker - val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, servers(1).config.brokerId.toString) - val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, servers(2).config.brokerId.toString) + val brokerResource1 = new ConfigResource(ConfigResource.Type.BROKER, brokers(1).config.brokerId.toString) + val brokerResource2 = new ConfigResource(ConfigResource.Type.BROKER, brokers(2).config.brokerId.toString) val configResources = Seq(topicResource1, topicResource2, brokerResource1, brokerResource2) val describeResult = client.describeConfigs(configResources.asJava) val configs = describeResult.all.get @@ -395,10 +397,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFalse(maxMessageBytes2.isSensitive) assertFalse(maxMessageBytes2.isReadOnly) - assertEquals(servers(1).config.nonInternalValues.size, configs.get(brokerResource1).entries.size) - assertEquals(servers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) + assertEquals(brokers(1).config.nonInternalValues.size, configs.get(brokerResource1).entries.size) + assertEquals(brokers(1).config.brokerId.toString, configs.get(brokerResource1).get(KafkaConfig.BrokerIdProp).value) val listenerSecurityProtocolMap = configs.get(brokerResource1).get(KafkaConfig.ListenerSecurityProtocolMapProp) - assertEquals(servers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) + assertEquals(brokers(1).config.getString(KafkaConfig.ListenerSecurityProtocolMapProp), listenerSecurityProtocolMap.value) assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name) assertFalse(listenerSecurityProtocolMap.isDefault) assertFalse(listenerSecurityProtocolMap.isSensitive) @@ -410,18 +412,18 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(truststorePassword.isSensitive) assertFalse(truststorePassword.isReadOnly) val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp) - assertEquals(servers(1).config.compressionType, compressionType.value) + assertEquals(brokers(1).config.compressionType, compressionType.value) assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name) assertTrue(compressionType.isDefault) assertFalse(compressionType.isSensitive) assertFalse(compressionType.isReadOnly) - assertEquals(servers(2).config.nonInternalValues.size, configs.get(brokerResource2).entries.size) - assertEquals(servers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) - assertEquals(servers(2).config.logCleanerThreads.toString, + assertEquals(brokers(2).config.nonInternalValues.size, configs.get(brokerResource2).entries.size) + assertEquals(brokers(2).config.brokerId.toString, configs.get(brokerResource2).get(KafkaConfig.BrokerIdProp).value) + assertEquals(brokers(2).config.logCleanerThreads.toString, configs.get(brokerResource2).get(KafkaConfig.LogCleanerThreadsProp).value) - checkValidAlterConfigs(client, topicResource1, topicResource2) + checkValidAlterConfigs(client, this, topicResource1, topicResource2) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @@ -968,10 +970,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { futures.foreach(_.get) } - @Test - def testInvalidAlterConfigs(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testInvalidAlterConfigs(quorum: String): Unit = { client = Admin.create(createConfig) - checkInvalidAlterConfigs(zkClient, servers, client) + checkInvalidAlterConfigs(this, client) } /** @@ -1874,7 +1877,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], Some("Invalid value zip for configuration compression.type")) } else { - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], Some("Invalid config value for resource")) } } @@ -2078,7 +2081,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { ).asJava) assertEquals(Set(topic1Resource).asJava, alterResult.values.keySet) - assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidRequestException], + assertFutureExceptionTypeEquals(alterResult.values().get(topic1Resource), classOf[InvalidConfigurationException], Some("Invalid config value for resource")) } @@ -2487,7 +2490,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { object PlaintextAdminIntegrationTest { @nowarn("cat=deprecation") - def checkValidAlterConfigs(client: Admin, topicResource1: ConfigResource, topicResource2: ConfigResource): Unit = { + def checkValidAlterConfigs( + admin: Admin, + test: KafkaServerTestHarness, + topicResource1: ConfigResource, + topicResource2: ConfigResource + ): Unit = { // Alter topics var topicConfigEntries1 = Seq( new ConfigEntry(LogConfig.FlushMsProp, "1000") @@ -2498,7 +2506,7 @@ object PlaintextAdminIntegrationTest { new ConfigEntry(LogConfig.CompressionTypeProp, "lz4") ).asJava - var alterResult = client.alterConfigs(Map( + var alterResult = admin.alterConfigs(Map( topicResource1 -> new Config(topicConfigEntries1), topicResource2 -> new Config(topicConfigEntries2) ).asJava) @@ -2507,7 +2515,8 @@ object PlaintextAdminIntegrationTest { alterResult.all.get // Verify that topics were updated correctly - var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) + test.ensureConsistentKRaftMetadata() + var describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2).asJava) var configs = describeResult.all.get assertEquals(2, configs.size) @@ -2530,7 +2539,7 @@ object PlaintextAdminIntegrationTest { new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "0.3") ).asJava - alterResult = client.alterConfigs(Map( + alterResult = admin.alterConfigs(Map( topicResource1 -> new Config(topicConfigEntries1), topicResource2 -> new Config(topicConfigEntries2) ).asJava, new AlterConfigsOptions().validateOnly(true)) @@ -2539,7 +2548,8 @@ object PlaintextAdminIntegrationTest { alterResult.all.get // Verify that topics were not updated due to validateOnly = true - describeResult = client.describeConfigs(Seq(topicResource1, topicResource2).asJava) + test.ensureConsistentKRaftMetadata() + describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2).asJava) configs = describeResult.all.get assertEquals(2, configs.size) @@ -2550,15 +2560,18 @@ object PlaintextAdminIntegrationTest { } @nowarn("cat=deprecation") - def checkInvalidAlterConfigs(zkClient: KafkaZkClient, servers: Seq[KafkaServer], client: Admin): Unit = { + def checkInvalidAlterConfigs( + test: KafkaServerTestHarness, + admin: Admin + ): Unit = { // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - TestUtils.createTopic(zkClient, topic1, 1, 1, servers) + createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1, replicationFactor = 1) val topic2 = "invalid-alter-configs-topic-2" val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - TestUtils.createTopic(zkClient, topic2, 1, 1, servers) + createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1, replicationFactor = 1) val topicConfigEntries1 = Seq( new ConfigEntry(LogConfig.MinCleanableDirtyRatioProp, "1.1"), // this value is invalid as it's above 1.0 @@ -2567,23 +2580,24 @@ object PlaintextAdminIntegrationTest { var topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "snappy")).asJava - val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString) + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, test.brokers.head.config.brokerId.toString) val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.ZkConnectProp, "localhost:2181")).asJava // Alter configs: first and third are invalid, second is valid - var alterResult = client.alterConfigs(Map( + var alterResult = admin.alterConfigs(Map( topicResource1 -> new Config(topicConfigEntries1), topicResource2 -> new Config(topicConfigEntries2), brokerResource -> new Config(brokerConfigEntries) ).asJava) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException]) alterResult.values.get(topicResource2).get assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that first and third resources were not updated and second was updated - var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) + test.ensureConsistentKRaftMetadata() + var describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) var configs = describeResult.all.get assertEquals(3, configs.size) @@ -2599,19 +2613,20 @@ object PlaintextAdminIntegrationTest { // Alter configs with validateOnly = true: first and third are invalid, second is valid topicConfigEntries2 = Seq(new ConfigEntry(LogConfig.CompressionTypeProp, "gzip")).asJava - alterResult = client.alterConfigs(Map( + alterResult = admin.alterConfigs(Map( topicResource1 -> new Config(topicConfigEntries1), topicResource2 -> new Config(topicConfigEntries2), brokerResource -> new Config(brokerConfigEntries) ).asJava, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) - assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidConfigurationException]) alterResult.values.get(topicResource2).get assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true - describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) + test.ensureConsistentKRaftMetadata() + describeResult = admin.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) configs = describeResult.all.get assertEquals(3, configs.size) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index dda1ac3347de5..f46713337a7e6 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -352,4 +352,13 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { ) } } + + def ensureConsistentKRaftMetadata(): Unit = { + if (isKRaftTest()) { + TestUtils.ensureConsistentKRaftMetadata( + brokers, + controllerServer + ) + } + } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 875b7605b5762..84d6f5a2ef93d 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -20,7 +20,7 @@ import java.net.InetAddress import java.nio.charset.StandardCharsets import java.util import java.util.Collections.{singletonList, singletonMap} -import java.util.Properties +import java.util.{Collections, Properties} import java.util.concurrent.ExecutionException import kafka.integration.KafkaServerTestHarness @@ -30,7 +30,7 @@ import kafka.server.Constants._ import kafka.zk.ConfigEntityChangeNotificationZNode import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET -import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry} +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, ConfigEntry} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.internals.QuotaConfigs @@ -438,16 +438,28 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) - def testConfigureDefaultTopic(quorum: String): Unit = { + def testIncrementalAlterDefaultTopicConfig(quorum: String): Unit = { val admin = createAdminClient() try { val resource = new ConfigResource(ConfigResource.Type.TOPIC, "") val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "200000"), SET) - admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get - fail("Should fail with InvalidRequestException for topic doesn't exist") - } catch { - case e: ExecutionException => - assertEquals(classOf[InvalidRequestException], e.getCause().getClass()) + val future = admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all + TestUtils.assertFutureExceptionTypeEquals(future, classOf[InvalidRequestException]) + } finally { + admin.close() + } + } + + @nowarn("cat=deprecation") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAlterDefaultTopicConfig(quorum: String): Unit = { + val admin = createAdminClient() + try { + val resource = new ConfigResource(ConfigResource.Type.TOPIC, "") + val config = new Config(Collections.singleton(new ConfigEntry(FlushMessagesProp, "200000"))) + val future = admin.alterConfigs(Map(resource -> config).asJava).all + TestUtils.assertFutureExceptionTypeEquals(future, classOf[InvalidRequestException]) } finally { admin.close() }