diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6923d618e98c..f16fc44b3b33 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -259,6 +259,8 @@ class RequestSendThread(val controllerId: Int, private var firstUpdateMetadataWithPartitionsSent = false + private val maxRequestCountToMerge = 100 + @volatile private var latestRequestStatus = LatestRequestStatus(isInFlight = false, isInQueue = false, 0) // This metric reports the queued time of the latest request from the queue @@ -358,10 +360,14 @@ class RequestSendThread(val controllerId: Int, // one concurrent access case considering the producer of the queue: // an item is put to the queue right after the condition check below. // That behavior does not change correctness since the inserted item will be picked up in the next round - while (!queue.isEmpty && shouldContinueMerging) { + // Fixme: tmp fix for incident-1517. We tried to merge limited amount of requests to a single LiCombinedControlRequest + // so that a single request size is not larger than max limit which is 100MB. + var counter = maxRequestCountToMerge + while (!queue.isEmpty && shouldContinueMerging && maxRequestCountToMerge > 0) { // Continue merging if the item taken from the queue is a // control request and has set `shouldContinueMerging` to true shouldContinueMerging = takeFromQueueAndMerge().exists(_._2) + counter = counter - 1 } val requestBuilder = controllerRequestMerger.pollLatestRequest()