Skip to content

Commit

Permalink
[LI-FIXUP] Enable register watch when getting children for federated …
Browse files Browse the repository at this point in the history
…topic namespaces (#498)

This should be a direct fix up to the commit

64c29e7 ([LI-HOTFIX] Change federated topic znode structure #495 )

EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl.
  • Loading branch information
kehuum authored Nov 17, 2023
1 parent 64c29e7 commit 1515321
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3841,7 +3841,7 @@ class KafkaApis(val requestChannel: RequestChannel,
try {
if (requestedTopics.isEmpty) {
// if empty list passed, list all existing federated topics
val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics.toList.asJava
val allFederatedTopicZnodes = zkSupport.zkClient.getAllFederatedTopics().toList.asJava

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new LiListFederatedTopicZnodesResponse(
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient,
}
}

def getAllFederatedTopics: Set[String] = {
def getAllFederatedTopics(registerWatch: Boolean = false): Set[String] = {
val namespaces = getChildren(FederatedTopicsZNode.path)
namespaces
// For all topics, generate (topic -> namespace) tuple
.flatMap(namespace => getAllFederatedTopicsInNamespace(namespace).map(_ -> namespace))
.flatMap(namespace => getAllFederatedTopicsInNamespace(namespace, registerWatch).map(_ -> namespace))
// To map to merge potential duplicate of topic -> namespace
.toMap
// Serialize to znode paths
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg

def waitForFederatedTopicZnodes(client: KafkaZkClient, expectedPresent: Seq[String], expectedMissing: Seq[String]): Unit = {
waitUntilTrue(() => {
val topics = client.getAllFederatedTopics
val topics = client.getAllFederatedTopics()
expectedPresent.forall(topicName => topics.contains(topicName)) &&
expectedMissing.forall(topicName => !topics.contains(topicName))
}, "timed out waiting for federated topics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
waitForFederatedTopicZnodes(zkClient, expectedFedTopic, List())

// check federated topic is created
val federatedTopics = zkClient.getAllFederatedTopics
val federatedTopics = zkClient.getAllFederatedTopics()
assertEquals(1, federatedTopics.size)

// test list federated topic znodes api
Expand All @@ -167,7 +167,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
waitForFederatedTopicZnodes(zkClient, List(), expectedFedTopic)

// after deletion, federated topic znodes should be empty
val result1 = zkClient.getAllFederatedTopics
val result1 = zkClient.getAllFederatedTopics()
assertEquals(0, result1.size)
}

Expand Down

0 comments on commit 1515321

Please sign in to comment.