Skip to content

Commit

Permalink
Revert "[LI-HOTFIX] Return valid data during throttling (#514)"
Browse files Browse the repository at this point in the history
This reverts commit d4f6f91.
  • Loading branch information
Hao Geng committed Jun 11, 2024
1 parent d4f6f91 commit fadb79f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,20 @@ public boolean handleResponse(FetchResponse response) {
return false;
}
if (nextMetadata.isFull()) {
if (response.throttleTimeMs() > 0) {
// [LIKAFKA-59133] To avoid stuck consumer, we made a server side change to return valid fetch responses
// even when the request is throttled. To honor the server side change, we log the throttling and still
// handle the fetch response.
if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) {
// Normally, an empty full fetch response would be invalid. However, KIP-219
// specifies that if the broker wants to throttle the client, it will respond
// to a full fetch request with an empty response and a throttleTimeMs
// value set. We don't want to log this with a warning, since it's not an error.
// However, the empty full fetch response can't be processed, so it's still appropriate
// to return false here.
if (log.isDebugEnabled()) {
log.debug("Node {} sent a response indicate the request is throttled for {} ms.", node,
response.throttleTimeMs());
log.debug("Node {} sent a empty full fetch response to indicate that this " +
"client should be throttled for {} ms.", node, response.throttleTimeMs());
}
nextMetadata = FetchMetadata.INITIAL;
return false;
}

String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,23 +990,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)

// [LIKAFKA-59133] We made a change here to actually fill in the data to the fetch response even when throttling happens.
// This prevents the consumers completely getting stuck when throttling happens intensively.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)

// [LIKAFKA-45345] even if the throttleTimeMs is 0, we still record it so that
// the throttle-time sensor does not expire before the byte-rate sensor in quotas.fetch
// or the request-time sensor in quotas.request.
val (effectiveBandwidthThrottleTime, effectiveRequestThrottleTime) = if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs

// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
(bandwidthThrottleTimeMs, 0)
} else {
(0, requestThrottleTimeMs)
}
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
(0, 0)
}
Expand Down

0 comments on commit fadb79f

Please sign in to comment.