Skip to content

Commit

Permalink
KAFKA-13923; Generalize authorizer system test for kraft (apache#12190)
Browse files Browse the repository at this point in the history
Change `ZookeeperAuthorizerTest` to `AuthorizerTest` and add support for KRaft's `StandardAuthorizer` implementation.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
Jason Gustafson authored May 23, 2022
1 parent 4878653 commit b5699b5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
3 changes: 2 additions & 1 deletion tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class for details.
METADATA_SNAPSHOT_SEARCH_STR = "%s/__cluster_metadata-0/*.checkpoint" % METADATA_LOG_DIR
METADATA_FIRST_LOG = "%s/__cluster_metadata-0/00000000000000000000.log" % METADATA_LOG_DIR
# Kafka Authorizer
ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
ZK_ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
KRAFT_ACL_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL'
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
# limitations under the License.

from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.mark import matrix
from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.security.kafka_acls import ACLs

class ZooKeeperAuthorizerTest(Test):
"""Tests that the ZooKeeper-based Authorizer works wth both ZooKeeper-based and KRaft clusters.
class AuthorizerTest(Test):
"""Tests that the default Authorizer implementations work with both ZooKeeper-based and KRaft clusters.
Alters client quotas, making sure it works.
Rolls Kafka with an authorizer.
Alters client quotas, making sure it fails.
Expand All @@ -36,22 +36,29 @@ class ZooKeeperAuthorizerTest(Test):
"""

def __init__(self, test_context):
super(ZooKeeperAuthorizerTest, self).__init__(test_context=test_context)

self.topic = "test_topic"
# setup ZooKeeper even with KRaft
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}},
controller_num_nodes_override=1, allow_zk_with_kraft=True)
super(AuthorizerTest, self).__init__(test_context=test_context)
self.test_context = test_context

def setUp(self):
# start ZooKeeper even with KRaft
self.zk.start()
self.acls = ACLs(self.test_context)

@cluster(num_nodes=4)
@matrix(metadata_quorum=quorum.all_non_upgrade)
def test_authorizer(self, metadata_quorum):
@parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.KRAFT_ACL_AUTHORIZER)
@parametrize(metadata_quorum=quorum.remote_kraft, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER)
@parametrize(metadata_quorum=quorum.zk, authorizer_class=KafkaService.ZK_ACL_AUTHORIZER)
def test_authorizer(self, metadata_quorum, authorizer_class):
topics = {"test_topic": {"partitions": 1, "replication-factor": 1}}

if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER):
self.zk = None
else:
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()

self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics=topics, controller_num_nodes_override=1,
allow_zk_with_kraft=True)

broker_security_protocol = "SSL"
broker_principal = "User:CN=systemtest"
client_security_protocol = "SASL_PLAINTEXT"
Expand Down Expand Up @@ -80,11 +87,11 @@ def test_authorizer(self, metadata_quorum):
# we need to explicitly reconfigure/restart any remote controller quorum
self.kafka.logger.info("Restarting Remote KRaft Controller with authorizer and broker principal as super user")
controller_quorum = self.kafka.controller_quorum
controller_quorum.authorizer_class_name = KafkaService.ACL_AUTHORIZER
controller_quorum.authorizer_class_name = authorizer_class
controller_quorum.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer
controller_quorum.restart_cluster()
self.kafka.logger.info("Restarting Kafka with authorizer and broker principal as super user")
self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
self.kafka.authorizer_class_name = authorizer_class
self.kafka.server_prop_overrides = [["super.users", broker_principal]] # for broker to work with an authorizer
self.kafka.restart_cluster()

Expand Down

0 comments on commit b5699b5

Please sign in to comment.