diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 146dfc8f4665d..24eb5924fdd00 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig, // between the moment this broker started and right now when it becomes controller again. loadMinIsrForTopics(controllerContext.allTopics) - rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet, true) + rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet) registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach { case (topicPartition, replicaAssignment) => @@ -968,15 +968,10 @@ class KafkaController(val config: KafkaConfig, // Rearrange partition and replica assignment for new topics that get assigned to // maintenance brokers that do not take new partitions - private def rearrangePartitionReplicaAssignmentForNewPartitions(topics: Set[String], onlyNewTopics: Boolean) { + private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) { try { val noNewPartitionBrokers = partitionUnassignableBrokerIds if (noNewPartitionBrokers.nonEmpty) { - val topicsToCheck = if (onlyNewTopics) - zkClient.getPartitionNodeNonExistsTopics(topics.toSet) - else - topics - val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter { case (topic, partitionMap) => val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) @@ -1705,7 +1700,7 @@ class KafkaController(val config: KafkaConfig, val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics - rearrangePartitionReplicaAssignmentForNewPartitions(newTopics, true) + rearrangePartitionReplicaAssignmentForNewPartitions(newTopics) registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics) @@ -1763,7 +1758,7 @@ class KafkaController(val config: KafkaConfig, } if (!isActive) return - rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic), false) + rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic)) val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => controllerContext.partitionReplicaAssignment(topicPartition).isEmpty