Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] POC on reduce produce latency #470

Open
wants to merge 6 commits into
base: 3.0-li
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static LiCombinedControlRequestData.LeaderAndIsrPartitionState transformL
.setReplicas(partitionState.replicas())
.setAddingReplicas(partitionState.addingReplicas())
.setRemovingReplicas(partitionState.removingReplicas())
.setIsNew(partitionState.isNew());
.setIsNew(partitionState.isNew())
.setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack());
}

public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAndIsrPartition(
Expand All @@ -71,7 +72,8 @@ public static LeaderAndIsrRequestData.LeaderAndIsrPartitionState restoreLeaderAn
.setReplicas(partitionState.replicas())
.setAddingReplicas(partitionState.addingReplicas())
.setRemovingReplicas(partitionState.removingReplicas())
.setIsNew(partitionState.isNew());
.setIsNew(partitionState.isNew())
.setBlockFollowerFromAddingBack(partitionState.blockFollowerFromAddingBack());
}

public static LiCombinedControlRequestData.UpdateMetadataPartitionState transformUpdateMetadataPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "4+", "ignorable": true, "entityType": "brokerId",
"about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
{ "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
"about": "Whether the replica should have existed on the broker or not." }
"about": "Whether the replica should have existed on the broker or not." },
{ "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "6+", "versions": "6+", "default": "false",
"about": "Whether the leader should block follower being adding back in a short period."}
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@
{ "name": "RemovingReplicas", "type": "[]int32", "versions": "0+", "ignorable": true,
"about": "The replica IDs that we are removing this partition from, or null if no replicas are being removed." },
{ "name": "IsNew", "type": "bool", "versions": "0+", "default": "false", "ignorable": true,
"about": "Whether the replica should have existed on the broker or not." }
"about": "Whether the replica should have existed on the broker or not." },
{ "name": "BlockFollowerFromAddingBack", "type": "bool", "tag": 0, "taggedVersions": "1+", "versions": "1+", "default": "false",
"about": "Whether the leader should block follower being adding back in a short period."}
]},
{ "name": "UpdateMetadataPartitionState", "versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName", "ignorable": true,
Expand Down
34 changes: 31 additions & 3 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{IsolationLevel, TopicPartition, Uuid}

import scala.collection.{Map, Seq}
import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._

trait IsrChangeListener {
Expand Down Expand Up @@ -275,6 +275,9 @@ class Partition(val topicPartition: TopicPartition,
newGauge("LastStableOffsetLag", () => log.map(_.lastStableOffsetLag).getOrElse(0), tags)
ISR_STATES_TO_CREATE_METRICS.foreach(c => newGauge(s"${c.getSimpleName}", () => if (isrStateClass.equals(c)) 1 else 0, tags))

private val expandIsrLocks: mutable.Map[Int, Long] = mutable.Map.empty[Int, Long]
private val expandIsrLockTime: Long = 60000 // Block replica to be added back to ISR in 60 seconds

def isrStateClass: Class[_ <: IsrState] = isrState.getClass

def isUnderReplicated: Boolean = isLeader && (assignmentState.replicationFactor - isrState.isr.size) > 0
Expand Down Expand Up @@ -549,6 +552,8 @@ class Partition(val topicPartition: TopicPartition,
highWatermarkCheckpoints: OffsetCheckpoints,
topicId: Option[Uuid]): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
info(s"Making leader to partition $partitionState")

// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionState.controllerEpoch
Expand All @@ -561,7 +566,8 @@ class Partition(val topicPartition: TopicPartition,
assignment = partitionState.replicas.asScala.map(_.toInt),
isr = isr,
addingReplicas = addingReplicas,
removingReplicas = removingReplicas
removingReplicas = removingReplicas,
blockFollowerFromAddingBack = partitionState.blockFollowerFromAddingBack()
)
try {
createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId)
Expand Down Expand Up @@ -745,7 +751,8 @@ class Partition(val topicPartition: TopicPartition,
def updateAssignmentAndIsr(assignment: Seq[Int],
isr: Set[Int],
addingReplicas: Seq[Int],
removingReplicas: Seq[Int]): Unit = {
removingReplicas: Seq[Int],
blockFollowerFromAddingBack: Boolean = false): Unit = {
val newRemoteReplicas = assignment.filter(_ != localBrokerId)
val removedReplicas = remoteReplicasMap.keys.filter(!newRemoteReplicas.contains(_))

Expand All @@ -758,9 +765,19 @@ class Partition(val topicPartition: TopicPartition,
assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment)
else
assignmentState = SimpleAssignmentState(assignment)

if (blockFollowerFromAddingBack) {
info(s"hgeng: block follower from adding to isr: $isr")
blockFollowersFromAddingBackToIsr(isr)
}
isrState = CommittedIsr(isr)
}

def blockFollowersFromAddingBackToIsr(isr: Set[Int]): Unit = {
val followersKickedOutOfIsr = isrState.isr.diff(isr)
followersKickedOutOfIsr.foreach(replica => expandIsrLocks(replica) = System.currentTimeMillis())
}

/**
* Check and maybe expand the ISR of the partition.
* A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to
Expand Down Expand Up @@ -1379,6 +1396,17 @@ class Partition(val topicPartition: TopicPartition,
}

private[cluster] def expandIsr(newInSyncReplica: Int): Unit = {
if (expandIsrLocks.contains(newInSyncReplica)) {
if (System.currentTimeMillis() < expandIsrLocks(newInSyncReplica) + expandIsrLockTime) {
info(s"hgeng: Avoid adding $newInSyncReplica to isr as it is locked since ${expandIsrLocks(newInSyncReplica)}")
return
} else {
info(s"hgeng: Remove $newInSyncReplica from expandIsrLock map. Current time ${System.currentTimeMillis()}" +
s", lock time ${expandIsrLocks(newInSyncReplica)}")
expandIsrLocks.remove(newInSyncReplica)
}
}

// This is called from maybeExpandIsr which holds the ISR write lock
if (!isrState.isInflight) {
// When expanding the ISR, we can safely assume the new replica will make it into the ISR since this puts us in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,18 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
topicPartition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicaAssignment: ReplicaAssignment,
isNew: Boolean): Unit = {
addLeaderAndIsrRequestForBrokers(brokerIds, topicPartition, leaderIsrAndControllerEpoch, replicaAssignment, isNew, ControllerContextSnapshot(controllerContext))
isNew: Boolean,
blockFollowerFromAddingBack: Boolean = false): Unit = {
addLeaderAndIsrRequestForBrokers(brokerIds, topicPartition, leaderIsrAndControllerEpoch, replicaAssignment, isNew, ControllerContextSnapshot(controllerContext), blockFollowerFromAddingBack)
}

def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
topicPartition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicaAssignment: ReplicaAssignment,
isNew: Boolean,
controllerContextSnapshot: ControllerContextSnapshot): Unit = {
controllerContextSnapshot: ControllerContextSnapshot,
blockFollowerFromAddingBack: Boolean): Unit = {

brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
Expand All @@ -579,7 +581,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
.setReplicas(replicaAssignment.replicas.map(Integer.valueOf).asJava)
.setAddingReplicas(replicaAssignment.addingReplicas.map(Integer.valueOf).asJava)
.setRemovingReplicas(replicaAssignment.removingReplicas.map(Integer.valueOf).asJava)
.setIsNew(isNew || alreadyNew))
.setIsNew(isNew || alreadyNew)
.setBlockFollowerFromAddingBack(blockFollowerFromAddingBack))
}

addUpdateMetadataRequestForBrokers(controllerContextSnapshot.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition),
Expand Down Expand Up @@ -661,6 +664,8 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
else 0

var blockFollowerAddingBack = false

val maxBrokerEpoch = controllerContext.maxBrokerEpoch
leaderAndIsrRequestMap.forKeyValue { (broker, leaderAndIsrPartitionStates) =>
if (controllerContext.liveOrShuttingDownBrokerIds.contains(broker)) {
Expand All @@ -674,8 +679,14 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
} else {
"become-follower"
}
if (state.blockFollowerFromAddingBack()) {
blockFollowerAddingBack = true
info("hgeng:Blocking follower from adding back is true")
}

if (stateChangeLog.isTraceEnabled)
stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
stateChangeLog.trace(s"hgeng: Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition" +
s"blockFollowerAddingBack $blockFollowerAddingBack")
}
stateChangeLog.info(s"Sending LeaderAndIsr request to broker $broker with $numBecomeLeaders become-leader " +
s"and ${leaderAndIsrPartitionStates.size - numBecomeLeaders} become-follower partitions")
Expand All @@ -698,6 +709,9 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
controllerEpoch, brokerEpoch, maxBrokerEpoch, leaderAndIsrPartitionStates.values.toBuffer.asJava, topicIds.asJava, leaders.asJava)
}

if (blockFollowerAddingBack) {
info(s"hgeng: LeaderAndIsrRequestBuilder is $leaderAndIsrRequestBuilder")
}

sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => {
val leaderAndIsrResponse = r.asInstanceOf[LeaderAndIsrResponse]
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class ControllerContext {

val topicsToBeDeleted = mutable.Set.empty[String]

val replicasBeingShutdown = mutable.Set.empty[PartitionAndReplica]

/** The following topicsWithDeletionStarted variable is used to properly update the offlinePartitionCount metric.
* When a topic is going through deletion, we don't want to keep track of its partition state
* changes in the offlinePartitionCount metric. This goal means if some partitions of a topic are already
Expand Down
24 changes: 11 additions & 13 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ class KafkaController(val config: KafkaConfig,

zkClient.recordBrokerShutdown(id, brokerEpoch, controllerContext.epochZkVersion)
controllerContext.shuttingDownBrokerIds += (id -> brokerEpoch)
info(s"Shutting down broker $id")
info(s"hgeng:Shutting down broker $id")

debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
Expand All @@ -1759,19 +1759,17 @@ class KafkaController(val config: KafkaConfig,
controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == id
}
partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Some(ControlledShutdownPartitionLeaderElectionStrategy))
try {
brokerRequestBatch.newBatch()
partitionsFollowedByBroker.foreach { partition =>
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false)
}
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
case e: IllegalStateException =>
handleIllegalState(e)
}


// If the broker is a follower, updates the isr in ZK and notifies the current leader
replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
val followerReplicas = partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition, id)).toSeq
controllerContext.replicasBeingShutdown ++= followerReplicas
info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}")
info("hgeng: Handling state changes")
replicaStateMachine.handleStateChanges(followerReplicas, OfflineReplica)
info("hgeng: finished handling changes")
controllerContext.replicasBeingShutdown --= followerReplicas
info(s"hgeng: replicas being shutdown: ${controllerContext.replicasBeingShutdown}")
trace(s"All leaders = ${controllerContext.partitionsLeadershipInfo.mkString(",")}")
if (shouldSkipShutdownSafetyCheck) {
// When skipping shutdown safety check, we allow the broker to shutdown even though it may be the leader for some partitions.
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ class ZkReplicaStateMachine(config: KafkaConfig,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition),
isNew = false,
controllerContextSnapshot)
controllerContextSnapshot,
false)
case None =>
}
timingsOpt.foreach { timings =>
Expand All @@ -279,22 +280,27 @@ class ZkReplicaStateMachine(config: KafkaConfig,
}
}
case OfflineReplica =>
validReplicas.foreach { replica =>
// Should not send StopReplicaRequest to brokers being controlled shutdown
validReplicas.filter {replica => !controllerContext.replicasBeingShutdown.contains(replica)}.foreach { replica =>
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false)
}
val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica =>
controllerContext.partitionLeadershipInfo(replica.topicPartition).isDefined
}
val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
updatedLeaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>
stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
stateLogger.info(s"hgeng: Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
if (!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
val recipients = controllerContext.partitionReplicaAssignment(partition)
// possible PERF TODO? could add controllerContextSnapshot as 6th arg here, too:
stateLogger.info(s"hgeng: partition: $partition" +
s"replicaId: $replicaId, contains? ${controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId))}")
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
partition,
leaderIsrAndControllerEpoch,
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
controllerContext.partitionFullReplicaAssignment(partition),
isNew = false,
controllerContext.replicasBeingShutdown.contains(PartitionAndReplica(partition, replicaId)))
}
val replica = PartitionAndReplica(partition, replicaId)
val currentState = controllerContext.replicaState(replica)
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
lockOpt: Option[Lock] = None)
lockOpt: Option[Lock] = None,
creationTime: Long = System.currentTimeMillis())
extends DelayedOperation(delayMs, lockOpt) {

import DelayedOperation._
Expand Down Expand Up @@ -126,6 +127,14 @@ class DelayedProduce(delayMs: Long,
* Upon completion, return the current response status along with the error code per partition
*/
override def onComplete(): Unit = {
val timeToComplete = System.currentTimeMillis() - creationTime
if (timeToComplete > 1000) {
info(s"Delayed request takes ${timeToComplete} to complete," +
s" partitions are ${produceMetadata.produceStatus.keysIterator.next()}")
info(s"Delayed request takes ${timeToComplete} to complete," +
s" partitions are ${produceMetadata.produceStatus.keySet}")
}

val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus)
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ class KafkaApis(val requestChannel: RequestChannel,
}

private def doHandleLeaderAndIsrRequest(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest): LeaderAndIsrResponse = {

leaderAndIsrRequest.partitionStates().forEach( state =>
info(s"hgeng: Handling LAIR request partition state ${state}")
)


replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
RequestHandlerHelper.onLeadershipChange(groupCoordinator, txnCoordinator, _, _))
}
Expand Down
Loading