diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 55b5b7b87141b..bb7297475ed62 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -158,7 +158,8 @@ class for details. METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" % METADATA_LOG_DIR METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" % METADATA_LOG_DIR # Kafka Authorizer - ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer" + ZK_ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer" + KRAFT_ACL_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer" HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin") INTERBROKER_LISTENER_NAME = 'INTERNAL' JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf" diff --git a/tests/kafkatest/tests/core/zookeeper_authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py similarity index 80% rename from tests/kafkatest/tests/core/zookeeper_authorizer_test.py rename to tests/kafkatest/tests/core/authorizer_test.py index 97c20c0c41cd6..20994c5b0dde5 100644 --- a/tests/kafkatest/tests/core/zookeeper_authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -14,7 +14,7 @@ # limitations under the License. from ducktape.cluster.remoteaccount import RemoteCommandError -from ducktape.mark import matrix +from ducktape.mark import parametrize from ducktape.mark.resource import cluster from ducktape.tests.test import Test @@ -22,8 +22,8 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.security.kafka_acls import ACLs -class ZooKeeperAuthorizerTest(Test): - """Tests that the ZooKeeper-based Authorizer works wth both ZooKeeper-based and KRaft clusters. +class AuthorizerTest(Test): + """Tests that the default Authorizer implementations work with both ZooKeeper-based and KRaft clusters. Alters client quotas, making sure it works. Rolls Kafka with an authorizer. Alters client quotas, making sure it fails. @@ -36,22 +36,29 @@ class ZooKeeperAuthorizerTest(Test): """ def __init__(self, test_context): - super(ZooKeeperAuthorizerTest, self).__init__(test_context=test_context) - - self.topic = "test_topic" - # setup ZooKeeper even with KRaft - self.zk = ZookeeperService(test_context, num_nodes=1) - self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}, - controller_num_nodes_override=1, allow_zk_with_kraft=True) + super(AuthorizerTest, self).__init__(test_context=test_context) + self.test_context = test_context + def setUp(self): - # start ZooKeeper even with KRaft - self.zk.start() self.acls = ACLs(self.test_context) @cluster(num_nodes=4) - @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_authorizer(self, metadata_quorum): + @parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.KRAFT_ACL_AUTHORIZER) + @parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER) + @parametrize(metadata_quorum=quorum.zk, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER) + def test_authorizer(self, metadata_quorum, authorizer_class): + topics = {"test_topic": {"partitions": 1, "replication-factor": 1}} + + if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER): + self.zk = None + else: + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + topics=topics, controller_num_nodes_override=1, + allow_zk_with_kraft=True) + broker_security_protocol = "SSL" broker_principal = "User:CN=systemtest" client_security_protocol = "SASL_PLAINTEXT" @@ -80,11 +87,11 @@ def test_authorizer(self, metadata_quorum): # we need to explicitly reconfigure/restart any remote controller quorum self.kafka.logger.info("Restarting Remote KRaft Controller with authorizer and broker principal as super user") controller_quorum = self.kafka.controller_quorum - controller_quorum.authorizer_class_name = KafkaService.ACL_AUTHORIZER + controller_quorum.authorizer_class_name = authorizer_class controller_quorum.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer controller_quorum.restart_cluster() self.kafka.logger.info("Restarting Kafka with authorizer and broker principal as super user") - self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER + self.kafka.authorizer_class_name = authorizer_class self.kafka.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer self.kafka.restart_cluster()