diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java b/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java index be54b53380775..382265206ff9f 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/LiCombinedControlTransformer.java @@ -56,7 +56,8 @@ public static LiCombinedControlRequestData.LeaderAndIsrPartitionState transformL .setReplicas(partitionState.replicas()) .setAddingReplicas(partitionState.addingReplicas()) .setRemovingReplicas(partitionState.removingReplicas()) - .setIsNew(partitionState.isNew()); + .setIsNew(partitionState.isNew()) + .setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack()); } public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAndIsrPartition( @@ -71,7 +72,8 @@ public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAn .setReplicas(partitionState.replicas()) .setAddingReplicas(partitionState.addingReplicas()) .setRemovingReplicas(partitionState.removingReplicas()) - .setIsNew(partitionState.isNew()); + .setIsNew(partitionState.isNew()) + .setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack()); } public static LiCombinedControlRequestData.UpdateMetadataPartitionState transformUpdateMetadataPartition( diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index 8febea0fab937..fa62e456bf9bb 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -89,7 +89,9 @@ { "name": "RemovingReplicas", "type": "[]int32", "versions": "4+", "ignorable": true, "entityType": "brokerId", "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." }, { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, - "about": "Whether the replica should have existed on the broker or not." } + "about": "Whether the replica should have existed on the broker or not." }, + { "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "6+", "versions": "6+", "default": "false", + "about": "Whether the leader should block follower being adding back in a short period."} ]} ] } diff --git a/clients/src/main/resources/common/message/LiCombinedControlRequest.json b/clients/src/main/resources/common/message/LiCombinedControlRequest.json index 015f8a93163be..079846cc7fa99 100644 --- a/clients/src/main/resources/common/message/LiCombinedControlRequest.json +++ b/clients/src/main/resources/common/message/LiCombinedControlRequest.json @@ -127,7 +127,9 @@ { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." }, { "name": "IsNew", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, - "about": "Whether the replica should have existed on the broker or not." } + "about": "Whether the replica should have existed on the broker or not." }, + { "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "1+", "versions": "1+", "default": "false", + "about": "Whether the leader should block follower being adding back in a short period."} ]}, { "name": "UpdateMetadataPartitionState", "versions": "0+", "fields": [ { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName", "ignorable": true, diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 864410dccae21..ac6212c08963f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid} -import scala.collection.{Map, Seq} +import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ trait IsrChangeListener { @@ -275,6 +275,9 @@ class Partition(val topicPartition: TopicPartition, newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags) ISR_STATES_TO_CREATE_METRICS.foreach(c => newGauge(s"${c.getSimpleName}", () => if (isrStateClass.equals(c)) 1 else 0, tags)) + private val expandIsrLocks: mutable.Map[Int, Long] = mutable.Map.empty[Int, Long] + private val expandIsrLockTime: Long = 60000 // Block replica to be added back to ISR in 60 seconds + def isrStateClass: Class[_ <: IsrState] = isrState.getClass def isUnderReplicated: Boolean = isLeader && (assignmentState.replicationFactor - isrState.isr.size) > 0 @@ -549,6 +552,8 @@ class Partition(val topicPartition: TopicPartition, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Boolean = { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { + info(s"Making leader to partition $partitionState") + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = partitionState.controllerEpoch @@ -561,7 +566,8 @@ class Partition(val topicPartition: TopicPartition, assignment = partitionState.replicas.asScala.map(_.toInt), isr = isr, addingReplicas = addingReplicas, - removingReplicas = removingReplicas + removingReplicas = removingReplicas, + blockFollowerFromAddingBack = partitionState.blockFollowerFromAddingBack() ) try { createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId) @@ -745,7 +751,8 @@ class Partition(val topicPartition: TopicPartition, def updateAssignmentAndIsr(assignment: Seq[Int], isr: Set[Int], addingReplicas: Seq[Int], - removingReplicas: Seq[Int]): Unit = { + removingReplicas: Seq[Int], + blockFollowerFromAddingBack: Boolean = false): Unit = { val newRemoteReplicas = assignment.filter(_ != localBrokerId) val removedReplicas = remoteReplicasMap.keys.filter(!newRemoteReplicas.contains(_)) @@ -758,9 +765,19 @@ class Partition(val topicPartition: TopicPartition, assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment) else assignmentState = SimpleAssignmentState(assignment) + + if (blockFollowerFromAddingBack) { + info(s"hgeng: block follower from adding to isr: $isr") + blockFollowersFromAddingBackToIsr(isr) + } isrState = CommittedIsr(isr) } + def blockFollowersFromAddingBackToIsr(isr: Set[Int]): Unit = { + val followersKickedOutOfIsr = isrState.isr.diff(isr) + followersKickedOutOfIsr.foreach(replica => expandIsrLocks(replica) = System.currentTimeMillis()) + } + /** * Check and maybe expand the ISR of the partition. * A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to @@ -1379,6 +1396,17 @@ class Partition(val topicPartition: TopicPartition, } private[cluster] def expandIsr(newInSyncReplica: Int): Unit = { + if (expandIsrLocks.contains(newInSyncReplica)) { + if (System.currentTimeMillis() < expandIsrLocks(newInSyncReplica) + expandIsrLockTime) { + info(s"hgeng: Avoid adding $newInSyncReplica to isr as it is locked since ${expandIsrLocks(newInSyncReplica)}") + return + } else { + info(s"hgeng: Remove $newInSyncReplica from expandIsrLock map. Current time ${System.currentTimeMillis()}" + + s", lock time ${expandIsrLocks(newInSyncReplica)}") + expandIsrLocks.remove(newInSyncReplica) + } + } + // This is called from maybeExpandIsr which holds the ISR write lock if (!isrState.isInflight) { // When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6923d618e98c6..d0c388f2b5bb7 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -553,8 +553,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, topicPartition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicaAssignment: ReplicaAssignment, - isNew: Boolean): Unit = { - addLeaderAndIsrRequestForBrokers(brokerIds, topicPartition, leaderIsrAndControllerEpoch, replicaAssignment, isNew, ControllerContextSnapshot(controllerContext)) + isNew: Boolean, + blockFollowerFromAddingBack: Boolean = false): Unit = { + addLeaderAndIsrRequestForBrokers(brokerIds, topicPartition, leaderIsrAndControllerEpoch, replicaAssignment, isNew, ControllerContextSnapshot(controllerContext), blockFollowerFromAddingBack) } def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], @@ -562,7 +563,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicaAssignment: ReplicaAssignment, isNew: Boolean, - controllerContextSnapshot: ControllerContextSnapshot): Unit = { + controllerContextSnapshot: ControllerContextSnapshot, + blockFollowerFromAddingBack: Boolean): Unit = { brokerIds.filter(_ >= 0).foreach { brokerId => val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) @@ -579,7 +581,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, .setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava) .setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava) .setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava) - .setIsNew(isNew || alreadyNew)) + .setIsNew(isNew || alreadyNew) + .setBlockFollowerFromAddingBack(blockFollowerFromAddingBack)) } addUpdateMetadataRequestForBrokers(controllerContextSnapshot.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition), @@ -661,6 +664,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1 else 0 + var blockFollowerAddingBack = false + val maxBrokerEpoch = controllerContext.maxBrokerEpoch leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) => if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) { @@ -674,8 +679,14 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, } else { "become-follower" } + if (state.blockFollowerFromAddingBack()) { + blockFollowerAddingBack = true + info("hgeng:Blocking follower from adding back is true") + } + if (stateChangeLog.isTraceEnabled) - stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition") + stateChangeLog.trace(s"hgeng: Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition" + + s"blockFollowerAddingBack $blockFollowerAddingBack") } stateChangeLog.info(s"Sending LeaderAndIsr request to broker $broker with $numBecomeLeaders become-leader " + s"and ${leaderAndIsrPartitionStates.size - numBecomeLeaders} become-follower partitions") @@ -698,6 +709,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig, controllerEpoch, brokerEpoch, maxBrokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava) } + if (blockFollowerAddingBack) { + info(s"hgeng: LeaderAndIsrRequestBuilder is $leaderAndIsrRequestBuilder") + } sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => { val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse] diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 73790869be8c3..4ebe65b081e4e 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -104,6 +104,8 @@ class ControllerContext { val topicsToBeDeleted = mutable.Set.empty[String] + val replicasBeingShutdown = mutable.Set.empty[PartitionAndReplica] + /** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric. * When a topic is going through deletion, we don't want to keep track of its partition state * changes in the offlinePartitionCount metric. This goal means if some partitions of a topic are already diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 6e9b1c19ecfa6..4cd1c369aafa6 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1745,7 +1745,7 @@ class KafkaController(val config: KafkaConfig, zkClient.recordBrokerShutdown(id, brokerEpoch, controllerContext.epochZkVersion) controllerContext.shuttingDownBrokerIds += (id -> brokerEpoch) - info(s"Shutting down broker $id") + info(s"hgeng:Shutting down broker $id") debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}") debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}") @@ -1759,19 +1759,17 @@ class KafkaController(val config: KafkaConfig, controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == id } partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Some(ControlledShutdownPartitionLeaderElectionStrategy)) - try { - brokerRequestBatch.newBatch() - partitionsFollowedByBroker.foreach { partition => - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false) - } - brokerRequestBatch.sendRequestsToBrokers(epoch) - } catch { - case e: IllegalStateException => - handleIllegalState(e) - } + + // If the broker is a follower, updates the isr in ZK and notifies the current leader - replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition => - PartitionAndReplica(partition, id)).toSeq, OfflineReplica) + val followerReplicas = partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition, id)).toSeq + controllerContext.replicasBeingShutdown ++= followerReplicas + info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}") + info("hgeng: Handling state changes") + replicaStateMachine.handleStateChanges(followerReplicas, OfflineReplica) + info("hgeng: finished handling changes") + controllerContext.replicasBeingShutdown --= followerReplicas + info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}") trace(s"All leaders = ${controllerContext.partitionsLeadershipInfo.mkString(",")}") if (shouldSkipShutdownSafetyCheck) { // When skipping shutdown safety check, we allow the broker to shutdown even though it may be the leader for some partitions. diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 8909e75de36e6..b246d395d5ba0 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -260,7 +260,8 @@ class ZkReplicaStateMachine(config: KafkaConfig, leaderIsrAndControllerEpoch, controllerContext.partitionFullReplicaAssignment(partition), isNew = false, - controllerContextSnapshot) + controllerContextSnapshot, + false) case None => } timingsOpt.foreach { timings => @@ -279,7 +280,8 @@ class ZkReplicaStateMachine(config: KafkaConfig, } } case OfflineReplica => - validReplicas.foreach { replica => + // Should not send StopReplicaRequest to brokers being controlled shutdown + validReplicas.filter {replica => !controllerContext.replicasBeingShutdown.contains(replica)}.foreach { replica => controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false) } val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica => @@ -287,14 +289,18 @@ class ZkReplicaStateMachine(config: KafkaConfig, } val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) => - stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") + stateLogger.info(s"hgeng: Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica") if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) { - val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) + val recipients = controllerContext.partitionReplicaAssignment(partition) // possible PERF TODO? could add controllerContextSnapshot as 6th arg here, too: + stateLogger.info(s"hgeng: partition: $partition" + + s"replicaId: $replicaId, contains? ${controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId))}") controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients, partition, leaderIsrAndControllerEpoch, - controllerContext.partitionFullReplicaAssignment(partition), isNew = false) + controllerContext.partitionFullReplicaAssignment(partition), + isNew = false, + controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId))) } val replica = PartitionAndReplica(partition, replicaId) val currentState = controllerContext.replicaState(replica) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index ced8d5f53a358..a35bfa356c918 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long, produceMetadata: ProduceMetadata, replicaManager: ReplicaManager, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, - lockOpt: Option[Lock] = None) + lockOpt: Option[Lock] = None, + creationTime: Long = System.currentTimeMillis()) extends DelayedOperation(delayMs, lockOpt) { import DelayedOperation._ @@ -126,6 +127,14 @@ class DelayedProduce(delayMs: Long, * Upon completion, return the current response status along with the error code per partition */ override def onComplete(): Unit = { + val timeToComplete = System.currentTimeMillis() - creationTime + if (timeToComplete > 1000) { + info(s"Delayed request takes ${timeToComplete} to complete," + + s" partitions are ${produceMetadata.produceStatus.keysIterator.next()}") + info(s"Delayed request takes ${timeToComplete} to complete," + + s" partitions are ${produceMetadata.produceStatus.keySet}") + } + val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus } responseCallback(responseStatus) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2afd58cc88036..4a09218db62b8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -289,6 +289,12 @@ class KafkaApis(val requestChannel: RequestChannel, } private def doHandleLeaderAndIsrRequest(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = { + + leaderAndIsrRequest.partitionStates().forEach( state => + info(s"hgeng: Handling LAIR request partition state ${state}") + ) + + replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _)) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index a6ce3adf306d2..688e5102685f3 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -221,7 +221,7 @@ class KafkaServer( */ override def startup(): Unit = { try { - info("starting") + info("hgeng:1:25: starting") if (isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") @@ -547,7 +547,8 @@ class KafkaServer( } private def initZkClient(time: Time): Unit = { - info(s"Connecting to zookeeper on ${config.zkConnect}") + info("hgeng: init zk client") + info(s"hgeng fat: Connecting to zookeeper on ${config.zkConnect}") val secureAclsEnabled = config.zkEnableSecureAcls val isZkSecurityEnabled = JaasUtils.isZkSaslEnabled() || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig) @@ -758,7 +759,7 @@ class KafkaServer( // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. - info("Starting controlled shutdown") + info("hgeng:Starting controlled shutdown") _brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN @@ -775,7 +776,7 @@ class KafkaServer( */ override def shutdown(): Unit = { try { - info("shutting down") + info("hgeng: shutting down") if (isStartingUp.get) throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") @@ -784,9 +785,14 @@ class KafkaServer( // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to // `true` at the end of this method. if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { + info("hgeng: sending controlled shutdown") CoreUtils.swallow(controlledShutdown(), this) _brokerState = BrokerState.SHUTTING_DOWN + // This delay is to wait LeaderAndIsrRequest to remove the followers on this broker from their ISRs. + Thread.sleep(60000) + info("hgeng: received controlled shutdown success, and slept for 60 seconds") + if (healthCheckScheduler != null) healthCheckScheduler.shutdown() diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index cf3cd6948c74e..57e19b45e7274 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -195,7 +195,8 @@ class ReplicaStateMachineTest { controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), - partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot)) + partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot, + blockFollowerFromAddingBack = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) @@ -264,7 +265,8 @@ class ReplicaStateMachineTest { controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), - partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot)) + partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot, + blockFollowerFromAddingBack = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) @@ -378,7 +380,8 @@ class ReplicaStateMachineTest { controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), - partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot)) + partition, leaderIsrAndControllerEpoch, replicaAssignment(Seq(brokerId)), isNew = false, controllerContextSnapshot, + blockFollowerFromAddingBack = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica)