Skip to content

Commit

Permalink
KAFKA-9364: Fix misleading consumer logs on throttling (apache#7894) (#…
Browse files Browse the repository at this point in the history
…517)

When the consumer's fetch request is throttled by the KIP-219 mechanism,
it receives an empty fetch response.  The consumer should not log this
as an error.

Reviewers: Jason Gustafson <[email protected]>

Co-authored-by: Colin Patrick McCabe <[email protected]>
  • Loading branch information
Yellow-Rice and cmccabe authored Jul 18, 2024
1 parent 2720a23 commit 5f9845f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,11 @@ static Set<TopicPartition> findMissing(Set<TopicPartition> toFind, Set<TopicPart
* @param response The response.
* @return True if the full fetch response partitions are valid.
*/
private String verifyFullFetchResponsePartitions(FetchResponse<?> response) {
String verifyFullFetchResponsePartitions(FetchResponse<?> response) {
StringBuilder bld = new StringBuilder();
Set<TopicPartition> omitted =
findMissing(response.responseData().keySet(), sessionPartitions.keySet());
Set<TopicPartition> extra =
findMissing(response.responseData().keySet(), sessionPartitions.keySet());
Set<TopicPartition> omitted =
findMissing(sessionPartitions.keySet(), response.responseData().keySet());
if (!omitted.isEmpty()) {
bld.append("omitted=(").append(Utils.join(omitted, ", ")).append(", ");
Expand All @@ -313,7 +313,7 @@ private String verifyFullFetchResponsePartitions(FetchResponse<?> response) {
bld.append("extra=(").append(Utils.join(extra, ", ")).append(", ");
}
if ((!omitted.isEmpty()) || (!extra.isEmpty())) {
bld.append("response=(").append(Utils.join(response.responseData().keySet(), ", "));
bld.append("response=(").append(Utils.join(response.responseData().keySet(), ", ")).append(")");
return bld.toString();
}
return null;
Expand All @@ -325,7 +325,7 @@ private String verifyFullFetchResponsePartitions(FetchResponse<?> response) {
* @param response The response.
* @return True if the incremental fetch response partitions are valid.
*/
private String verifyIncrementalFetchResponsePartitions(FetchResponse<?> response) {
String verifyIncrementalFetchResponsePartitions(FetchResponse<?> response) {
Set<TopicPartition> extra =
findMissing(response.responseData().keySet(), sessionPartitions.keySet());
if (!extra.isEmpty()) {
Expand Down Expand Up @@ -394,7 +394,22 @@ public boolean handleResponse(FetchResponse<?> response) {
nextMetadata = nextMetadata.nextCloseExisting();
}
return false;
} else if (nextMetadata.isFull()) {
}
if (nextMetadata.isFull()) {
if (response.responseData().isEmpty() && response.throttleTimeMs() > 0) {
// Normally, an empty full fetch response would be invalid. However, KIP-219
// specifies that if the broker wants to throttle the client, it will respond
// to a full fetch request with an empty response and a throttleTimeMs
// value set. We don't want to log this with a warning, since it's not an error.
// However, the empty full fetch response can't be processed, so it's still appropriate
// to return false here.
if (log.isDebugEnabled()) {
log.debug("Node {} sent a empty full fetch response to indicate that this " +
"client should be throttled for {} ms.", node, response.throttleTimeMs());
}
nextMetadata = FetchMetadata.INITIAL;
return false;
}
String problem = verifyFullFetchResponsePartitions(response);
if (problem != null) {
log.info("Node {} sent an invalid full fetch response with {}", node, problem);
Expand Down Expand Up @@ -428,9 +443,12 @@ public boolean handleResponse(FetchResponse<?> response) {
return true;
} else {
// The incremental fetch session was continued by the server.
// We don't have to do anything special here to support KIP-219, since an empty incremental
// fetch request is perfectly valid.
if (log.isDebugEnabled())
log.debug("Node {} sent an incremental fetch response for session {}{}",
node, response.sessionId(), responseDataToLogString(response));
log.debug("Node {} sent an incremental fetch response with throttleTimeMs = {} " +
"for session {}{}", response.throttleTimeMs(), node, response.sessionId(),
responseDataToLogString(response));
nextMetadata = nextMetadata.nextIncremental();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,36 @@ public void testIncrementalPartitionRemoval() {
assertMapsEqual(reqMap(new ReqEntry("foo", 0, 0, 100, 200)),
data3.sessionPartitions(), data3.toSend());
}

@Test
public void testVerifyFullFetchResponsePartitions() throws Exception {
FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
String issue = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE,
respMap(new RespEntry("foo", 0, 10, 20),
new RespEntry("foo", 1, 10, 20),
new RespEntry("bar", 0, 10, 20)),
0, INVALID_SESSION_ID));
assertTrue(issue.contains("extra"));
assertFalse(issue.contains("omitted"));
FetchSessionHandler.Builder builder = handler.newBuilder();
builder.add(new TopicPartition("foo", 0),
new FetchRequest.PartitionData(0, 100, 200, Optional.empty()));
builder.add(new TopicPartition("foo", 1),
new FetchRequest.PartitionData(10, 110, 210, Optional.empty()));
builder.add(new TopicPartition("bar", 0),
new FetchRequest.PartitionData(20, 120, 220, Optional.empty()));
builder.build();
String issue2 = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE,
respMap(new RespEntry("foo", 0, 10, 20),
new RespEntry("foo", 1, 10, 20),
new RespEntry("bar", 0, 10, 20)),
0, INVALID_SESSION_ID));
assertTrue(issue2 == null);
String issue3 = handler.verifyFullFetchResponsePartitions(new FetchResponse<>(Errors.NONE,
respMap(new RespEntry("foo", 0, 10, 20),
new RespEntry("foo", 1, 10, 20)),
0, INVALID_SESSION_ID));
assertFalse(issue3.contains("extra"));
assertTrue(issue3.contains("omitted"));
}
}

0 comments on commit 5f9845f

Please sign in to comment.