Skip to content

Commit

Permalink
KAFKA-13899: Use INVALID_CONFIG error code consistently in AlterConfi…
Browse files Browse the repository at this point in the history
…g APIs (apache#12162)

In the AlterConfigs/IncrementalAlterConfigs zk handler, we return `INVALID_REQUEST` and `INVALID_CONFIG` inconsistently. The problem is in `LogConfig.validate`. We may either return `ConfigException` or `InvalidConfigException`. When the first of these is thrown, we catch it and convert to `INVALID_REQUEST`. If the latter is thrown, then we return `INVALID_CONFIG`. It seems more appropriate to return `INVALID_CONFIG` consistently, which is what the KRaft implementation already does this. This patch fixes this and converts a few integration tests to KRaft.

Reviewers: José Armando García Sancio <[email protected]>
  • Loading branch information
Jason Gustafson authored May 17, 2022
1 parent 0605198 commit 1103c76
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 63 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class ZkAdminManager(val config: KafkaConfig,
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
info(message)
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e))
case e: Throwable =>
val configProps = new Properties
config.entries.asScala.filter(_.value != null).foreach { configEntry =>
Expand All @@ -427,6 +427,10 @@ class ZkAdminManager(val config: KafkaConfig,
private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val topic = resource.name
if (topic.isEmpty()) {
throw new InvalidRequestException("Default topic resources are not allowed.")
}

if (!metadataCache.contains(topic))
throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")

Expand Down Expand Up @@ -489,6 +493,9 @@ class ZkAdminManager(val config: KafkaConfig,

resource.`type` match {
case ConfigResource.Type.TOPIC =>
if (resource.name.isEmpty()) {
throw new InvalidRequestException("Default topic resources are not allowed.")
}
val configProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, resource.name)
prepareIncrementalConfigs(alterConfigOps, configProps, LogConfig.configKeys)
alterTopicConfigs(resource, validateOnly, configProps, configEntriesMap)
Expand All @@ -511,7 +518,7 @@ class ZkAdminManager(val config: KafkaConfig,
case e @ (_: ConfigException | _: IllegalArgumentException) =>
val message = s"Invalid config value for resource $resource: ${e.getMessage}"
info(message)
resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
resource -> ApiError.fromThrowable(new InvalidConfigurationException(message, e))
case e: Throwable =>
// Log client errors at a lower level than unexpected exceptions
val message = s"Error processing alter configs request for resource $resource, config $alterConfigOps"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ package kafka.api
import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException

import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{Logging, TestUtils}
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
Expand All @@ -45,7 +48,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
TestUtils.waitUntilBrokerMetadataIsPropagated(brokers)
}

@AfterEach
Expand All @@ -58,14 +61,25 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
def createConfig: util.Map[String, Object] =
Map[String, Object](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()).asJava

override def generateConfigs = {
val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnect)
configs.foreach(props => props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy]))
override def generateConfigs: collection.Seq[KafkaConfig] = {
val configs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
configs.foreach(overrideNodeConfigs)
configs.map(KafkaConfig.fromProps)
}

@Test
def testValidAlterConfigs(): Unit = {
override def kraftControllerConfigs(): Seq[Properties] = {
val props = new Properties()
overrideNodeConfigs(props)
Seq(props)
}

private def overrideNodeConfigs(props: Properties): Unit = {
props.put(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[Policy])
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testValidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
// Create topics
val topic1 = "describe-alter-configs-topic-1"
Expand All @@ -79,18 +93,20 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopic(topic2, 1, 1)

PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, topicResource1, topicResource2)
PlaintextAdminIntegrationTest.checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}

@Test
def testInvalidAlterConfigs(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidAlterConfigs(quorum: String): Unit = {
client = Admin.create(createConfig)
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(zkClient, servers, client)
PlaintextAdminIntegrationTest.checkInvalidAlterConfigs(this, client)
}

@nowarn("cat=deprecation")
@Test
def testInvalidAlterConfigsDueToPolicy(): Unit = {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testInvalidAlterConfigsDueToPolicy(quorum: String): Unit = {
client = Admin.create(createConfig)

// Create topics
Expand All @@ -115,7 +131,7 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with

val topicConfigEntries3 = Seq(new ConfigEntry(LogConfig.MinInSyncReplicasProp, "-1")).asJava

val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, servers.head.config.brokerId.toString)
val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, brokers.head.config.brokerId.toString)
val brokerConfigEntries = Seq(new ConfigEntry(KafkaConfig.SslTruststorePasswordProp, "12313")).asJava

// Alter configs: second is valid, the others are invalid
Expand All @@ -129,10 +145,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])

// Verify that the second resource was updated and the others were not
ensureConsistentKRaftMetadata()
var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
var configs = describeResult.all.get
assertEquals(4, configs.size)
Expand All @@ -157,10 +174,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])

// Verify that no resources are updated since validate_only = true
ensureConsistentKRaftMetadata()
describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava)
configs = describeResult.all.get
assertEquals(4, configs.size)
Expand All @@ -173,7 +191,6 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
assertNull(configs.get(brokerResource).get(KafkaConfig.SslTruststorePasswordProp).value)
}


}

object AdminClientWithPoliciesIntegrationTest {
Expand Down
Loading

0 comments on commit 1103c76

Please sign in to comment.