Skip to content

Commit

Permalink
[LI-CHERRY-PICK] [c050503] KAFKA-7709: Fix ConcurrentModificationExce…
Browse files Browse the repository at this point in the history
…ption when retrieving expired inflight batches on multiple partitions. (apache#6005)

TICKET = KAFKA-7709
LI_DESCRIPTION = proactive cherrypick
EXIT_CRITERIA = HASH [c050503]
ORIGINAL_DESCRIPTION =

Reviewers: Dhruvil Shah <[email protected]>, Guozhang Wang <[email protected]>
(cherry picked from commit c050503)
  • Loading branch information
markcho authored and Sean Mccauliff committed Jun 13, 2019
1 parent 9faf4f6 commit 264a2ea
Showing 1 changed file with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2134,6 +2134,44 @@ public void testExpiredBatchesInMultiplePartitions() throws Exception {
}
}

@Test
public void testExpiredBatchesInMultiplePartitions() throws Exception {
long deliveryTimeoutMs = 1500L;
setupWithTransactionState(null, true, null);

// Send multiple ProduceRequest across multiple partitions.
Future<RecordMetadata> request1 = accumulator.append(tp0, time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;

// Send request.
sender.run(time.milliseconds());
assertEquals(1, client.inFlightRequestCount());
assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size());

Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
client.respond(new ProduceResponse(responseMap));

// Successfully expire both batches.
time.sleep(deliveryTimeoutMs);
sender.run(time.milliseconds());
assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size());

try {
request1.get();
fail("The expired batch should throw a TimeoutException");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}

try {
request2.get();
fail("The expired batch should throw a TimeoutException");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
}

private class MatchingBufferPool extends BufferPool {
IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;

Expand Down

0 comments on commit 264a2ea

Please sign in to comment.