diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 75be673c92309..a47065a2e31dd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3841,7 +3841,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { if (requestedTopics.isEmpty) { // if empty list passed, list all existing federated topics - val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics.toList.asJava + val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics().toList.asJava requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new LiListFederatedTopicZnodesResponse( diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 713ed3014ea8d..ab49a379c7779 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -153,11 +153,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, } } - def getAllFederatedTopics: Set[String] = { + def getAllFederatedTopics(registerWatch: Boolean = false): Set[String] = { val namespaces = getChildren(FederatedTopicsZNode.path) namespaces // For all topics, generate (topic -> namespace) tuple - .flatMap(namespace => getAllFederatedTopicsInNamespace(namespace).map(_ -> namespace)) + .flatMap(namespace => getAllFederatedTopicsInNamespace(namespace, registerWatch).map(_ -> namespace)) // To map to merge potential duplicate of topic -> namespace .toMap // Serialize to znode paths diff --git a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala index ef5a2093a3a92..9194ca1893eb2 100644 --- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala @@ -225,7 +225,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg def waitForFederatedTopicZnodes(client: KafkaZkClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = { waitUntilTrue(() => { - val topics = client.getAllFederatedTopics + val topics = client.getAllFederatedTopics() expectedPresent.forall(topicName => topics.contains(topicName)) && expectedMissing.forall(topicName => !topics.contains(topicName)) }, "timed out waiting for federated topics") diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0f434389b7885..894f9f0a8a014 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -144,7 +144,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { waitForFederatedTopicZnodes(zkClient, expectedFedTopic, List()) // check federated topic is created - val federatedTopics = zkClient.getAllFederatedTopics + val federatedTopics = zkClient.getAllFederatedTopics() assertEquals(1, federatedTopics.size) // test list federated topic znodes api @@ -167,7 +167,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { waitForFederatedTopicZnodes(zkClient, List(), expectedFedTopic) // after deletion, federated topic znodes should be empty - val result1 = zkClient.getAllFederatedTopics + val result1 = zkClient.getAllFederatedTopics() assertEquals(0, result1.size) }