Skip to content

Commit

Permalink
MINOR: Enable some AdminClient integration tests (apache#12110)
Browse files Browse the repository at this point in the history
Enable KRaft in `AdminClientWithPoliciesIntegrationTes`t and `PlaintextAdminIntegrationTest`. There are some tests not enabled or not as expected yet:

- testNullConfigs, see KAFKA-13863
- testDescribeCluster and testMetadataRefresh, currently we don't get the real controller in KRaft mode so the test may not run as expected

This patch also changes the exception type raised from invalid `IncrementalAlterConfig` requests with the `SUBTRACT` and `APPEND` operations. When the configuration value type is not a list, we now raise `INVALID_CONFIG` instead of `INVALID_REQUEST`.

Reviewers: Luke Chen <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
dengziming authored May 18, 2022
1 parent 62ba4d3 commit 67d00e2
Show file tree
Hide file tree
Showing 3 changed files with 324 additions and 254 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ object ConfigAdminManager {
case OpType.DELETE => configProps.remove(alterConfigOp.configEntry.name)
case OpType.APPEND => {
if (!listType(alterConfigOp.configEntry.name, configKeys))
throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
throw new InvalidConfigurationException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry.name}")
val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
.getOrElse("")
Expand All @@ -505,7 +505,7 @@ object ConfigAdminManager {
}
case OpType.SUBTRACT => {
if (!listType(alterConfigOp.configEntry.name, configKeys))
throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
throw new InvalidConfigurationException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry.name}")
val oldValueList = Option(configProps.getProperty(alterConfigOp.configEntry.name))
.orElse(Option(ConfigDef.convertToString(configKeys(configPropName).defaultValue, ConfigDef.Type.LIST)))
.getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ package kafka.api

import java.util
import java.util.Properties
import java.util.concurrent.ExecutionException

import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.TestUtils.assertFutureExceptionTypeEquals
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigsOptions, Config, ConfigEntry}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, PolicyViolationException}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.policy.AlterConfigPolicy
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -143,10 +143,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
).asJava)

assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])

// Verify that the second resource was updated and the others were not
ensureConsistentKRaftMetadata()
Expand All @@ -172,10 +172,10 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with
).asJava, new AlterConfigsOptions().validateOnly(true))

assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet)
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException])
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource1), classOf[PolicyViolationException])
alterResult.values.get(topicResource2).get
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidConfigurationException])
assertTrue(assertThrows(classOf[ExecutionException], () => alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException])
assertFutureExceptionTypeEquals(alterResult.values.get(topicResource3), classOf[InvalidConfigurationException])
assertFutureExceptionTypeEquals(alterResult.values.get(brokerResource), classOf[InvalidRequestException])

// Verify that no resources are updated since validate_only = true
ensureConsistentKRaftMetadata()
Expand Down
Loading

0 comments on commit 67d00e2

Please sign in to comment.