From 64c29e79f4e684518e77c1656d268340e7bb341e Mon Sep 17 00:00:00 2001 From: Ke Hu Date: Wed, 8 Nov 2023 16:22:06 -0800 Subject: [PATCH] [LI-HOTFIX] Change federated topic znode structure (#495) This PR changes the federated topic znodes from /kafka-tracking/federatedTopics/PageViewEvent(data = tracking) to /kafka-tracking/federatedTopics/tracking/PageViewEvent to save space in zk as well as simplify logic in mario side. Mario will make sure topic name uniqueness within same kafka cluster. LI_DESCRIPTION = To save space in zk as well as simplify logic in mario side, federated topic znodes structure are changed to encode namespace into the znode path instead of storing it in data part. Added integ-test to verify list create/delete/list federated topic znode api behavior EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl. --- .../org/apache/kafka/clients/admin/Admin.java | 4 +- .../kafka/clients/admin/KafkaAdminClient.java | 8 +- .../kafka/clients/admin/NoOpAdminClient.java | 4 +- .../LiListFederatedTopicZnodesRequest.json | 3 +- .../kafka/clients/admin/MockAdminClient.java | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 23 +++--- .../main/scala/kafka/zk/KafkaZkClient.scala | 73 +++++++++++-------- core/src/main/scala/kafka/zk/ZkData.scala | 5 +- .../api/PlaintextAdminIntegrationTest.scala | 5 +- 9 files changed, 72 insertions(+), 57 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 17417361efc4d..44d0752b1e333 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -315,7 +315,7 @@ default ListTopicsResult listTopics() { * @return all federated topic znodes, formatted /namespace/topic */ default ListFederatedTopicZnodesResult listFederatedTopicZnodes() { - return listFederatedTopicZnodes(Collections.emptyList(), new ListFederatedTopicZnodesOptions()); + return listFederatedTopicZnodes(Collections.emptyMap(), new ListFederatedTopicZnodesOptions()); } /** @@ -326,7 +326,7 @@ default ListFederatedTopicZnodesResult listFederatedTopicZnodes() { * @return empty list if the given topic names' znode don't exist; otherwise return the federated topics formatted * /namespace/topic */ - ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map federatedTopics, ListFederatedTopicZnodesOptions options); /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index f99d67d942f1d..b24909e6bb351 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2033,12 +2033,14 @@ void handleFailure(Throwable throwable) { } @Override - public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map federatedTopics, ListFederatedTopicZnodesOptions options) { final KafkaFutureImpl> federatedTopicZnodesListingFuture = new KafkaFutureImpl<>(); List topicsRequested = new ArrayList<>(); - federatedTopics.forEach(topic -> - topicsRequested.add(new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic))); + federatedTopics.forEach((topic, namespace) -> + topicsRequested.add( + new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic).setNamespace(namespace) + )); final long now = time.milliseconds(); runnable.call(new Call("listFederatedTopicZnodes", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java index 57a3acc009bf7..e9a93643e491c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NoOpAdminClient.java @@ -69,8 +69,8 @@ public ListTopicsResult listTopics(ListTopicsOptions options) { } @Override - public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, - ListFederatedTopicZnodesOptions options) { + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map federatedTopics, + ListFederatedTopicZnodesOptions options) { return null; } diff --git a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json index 4ba7709ef6220..22af4cf59c175 100644 --- a/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json +++ b/clients/src/main/resources/common/message/LiListFederatedTopicZnodesRequest.json @@ -23,7 +23,8 @@ "fields": [ { "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The simple name of the federated topics", "fields": [ - {"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"} + {"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"}, + {"name": "Namespace", "type": "string", "versions": "0+", "about": "The namespace of the topic"} ]} ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 60cd6d83ad581..6da75123b6214 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -361,8 +361,8 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) { } @Override - public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List federatedTopics, - ListFederatedTopicZnodesOptions options) { + public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map federatedTopics, + ListFederatedTopicZnodesOptions options) { throw new UnsupportedOperationException("Not implemented yet"); } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35e3a405cf7ea..75be673c92309 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3801,9 +3801,16 @@ class KafkaApis(val requestChannel: RequestChannel, } } + val toDeleteZNodes = mutable.Map[String, String]() + federatedTopicZnodesDeleteRequest.data.topics.forEach { topic => + if (results.find(topic.name).errorCode == Errors.NONE.code) { + toDeleteZNodes += topic.name -> topic.namespace + } + } + try { - toDelete.foreach(federatedTopic => { - zkSupport.zkClient.deleteFederatedTopicZNode(federatedTopic) + toDeleteZNodes.foreach(federatedTopic => { + zkSupport.zkClient.deleteFederatedTopicZNode(federatedTopic._1, federatedTopic._2) }) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => LiDeleteFederatedTopicZnodesResponse.prepareResponse(Errors.NONE, requestThrottleMs, @@ -3844,14 +3851,10 @@ class KafkaApis(val requestChannel: RequestChannel, ) } else { // if non-empty list passed, only list znode values for the given topics - val foundFederatedTopicZnodes = mutable.Set[String]() - - requestedTopics.forEach(topic => { - val curFederatedTopicZnode = zkSupport.zkClient.getFederatedTopic(topic.name()) - if (curFederatedTopicZnode != null) { - foundFederatedTopicZnodes.add(curFederatedTopicZnode) - } - }) + val foundFederatedTopicZnodes = requestedTopics.asScala + // from a list of Options, flatten extracts value from Some, and nothing from None + .flatMap(topic => zkSupport.zkClient.getFederatedTopic(topic.name(), topic.namespace())) + .toSet requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new LiListFederatedTopicZnodesResponse( diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 6f95bb0e50f27..713ed3014ea8d 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -145,37 +145,25 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, }.toMap } - def getFederatedTopic(topic: String): String = { - val getDataRequest = GetDataRequest(FederatedTopicZnode.path(topic)) - val getDataResponse = retryRequestUntilConnected(getDataRequest) - getDataResponse.resultCode match { - case Code.OK => "/" + FederatedTopicZnode.decode(getDataResponse.data) + "/" + topic - case Code.NONODE => null - case _ => throw getDataResponse.resultException.get + private[kafka] def getFederatedTopic(topic: String, namespace: String): Option[String] = { + if (pathExists(FederatedTopicZnode.path(topic, namespace))) { + Some(s"/$namespace/$topic") + } else { + None } } def getAllFederatedTopics: Set[String] = { - val topics = getChildren(FederatedTopicsZNode.path) - - val merge: ((String, String)) => String = { - case (key, value) => "/" + value + "/" + key - } - - val getDataRequests = topics.map(topic => GetDataRequest( - FederatedTopicZnode.path(topic), - ctx = Some(topic))) - - val getDataResponses = retryRequestsUntilConnected(getDataRequests) - getDataResponses.flatMap { getDataResponse => - val topic = getDataResponse.ctx.get.asInstanceOf[String] - getDataResponse.resultCode match { - case Code.OK => Some(topic, FederatedTopicZnode.decode(getDataResponse.data)) - case Code.NONODE => None - case _ => throw getDataResponse.resultException.get - } - }.toMap.map(merge) - }.toSet + val namespaces = getChildren(FederatedTopicsZNode.path) + namespaces + // For all topics, generate (topic -> namespace) tuple + .flatMap(namespace => getAllFederatedTopicsInNamespace(namespace).map(_ -> namespace)) + // To map to merge potential duplicate of topic -> namespace + .toMap + // Serialize to znode paths + .map { case (topic: String, namespace: String) => s"/$namespace/$topic" } + .toSet + } /** * Registers a given broker in zookeeper as the controller and increments controller epoch. @@ -629,6 +617,29 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, } } + /** + * Gets all federated topics in the namespace. + * @param registerWatch indicates if a watch must be registered or not + * @return sequence of topics in the cluster. + * + */ + def getAllFederatedTopicsInNamespace(namespace: String, registerWatch: Boolean = false): Set[String] = { + val getChildrenResponse = retryRequestUntilConnected( + if (paginateTopics) { + debug(s"upgrading GetChildrenRequest to GetChildrenPaginatedRequest for " + + s"'${FederatedTopicZnode.namespacePath(namespace)}'") + GetChildrenPaginatedRequest(FederatedTopicZnode.namespacePath(namespace), registerWatch) + } else { + GetChildrenRequest(FederatedTopicZnode.namespacePath(namespace), registerWatch) + } + ) + getChildrenResponse.resultCode match { + case Code.OK => getChildrenResponse.children.toSet + case Code.NONODE => Set.empty + case _ => throw getChildrenResponse.resultException.get + } + } + /** * Checks the topic existence * @param topicName @@ -1896,12 +1907,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, } def createFederatedTopicZNode(topic: String, namespace: String): Unit = { - val path = FederatedTopicZnode.path(topic) - createRecursive(path, FederatedTopicZnode.encode(namespace)) + val path = FederatedTopicZnode.path(topic, namespace) + createRecursive(path) } - def deleteFederatedTopicZNode(topic: String): Unit = { - deletePath(FederatedTopicZnode.path(topic), ZkVersion.MatchAnyVersion, false) + def deleteFederatedTopicZNode(topic: String, namespace: String): Unit = { + deletePath(FederatedTopicZnode.path(topic, namespace), ZkVersion.MatchAnyVersion, false) } private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = { diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index e074e94d32bb5..2e3c7964e6f66 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -340,9 +340,8 @@ object BrokerIdZNode { } object FederatedTopicZnode { - def path(topic: String) = s"${FederatedTopicsZNode.path}/$topic" - def encode(namespace: String): Array[Byte] = namespace.getBytes(UTF_8) - def decode(bytes: Array[Byte]): String = if (bytes != null) new String(bytes, UTF_8) else "" + def path(topic: String, namespace: String) = s"${FederatedTopicsZNode.path}/$namespace/$topic" + def namespacePath(namespace: String) = s"${FederatedTopicsZNode.path}/$namespace" } object TopicsZNode { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 4117206522c1f..0f434389b7885 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -153,12 +153,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(1, allZnodes.size()) assertEquals(expectedFedTopicString, allZnodes.get(0)) // 2. test list specific success - val expectedSuccess = client.listFederatedTopicZnodes(Collections.singletonList("federated-test-topic"), - new ListFederatedTopicZnodesOptions()).topics().get() + val expectedSuccess = client.listFederatedTopicZnodes(federatedTopic, new ListFederatedTopicZnodesOptions()).topics().get() assertEquals(1, expectedSuccess.size()) assertEquals(expectedFedTopicString, expectedSuccess.get(0)) // 3. test list specific fail - val expectedFail = client.listFederatedTopicZnodes(Collections.singletonList("non-exist-topic"), + val expectedFail = client.listFederatedTopicZnodes(Collections.singletonMap("non-exist-topic", "tracking"), new ListFederatedTopicZnodesOptions()).topics().get() assertEquals(0, expectedFail.size())