diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 42e378e996996..3dc0292503bfd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -2134,6 +2134,44 @@ public void testExpiredBatchesInMultiplePartitions() throws Exception { } } + @Test + public void testExpiredBatchesInMultiplePartitions() throws Exception { + long deliveryTimeoutMs = 1500L; + setupWithTransactionState(null, true, null); + + // Send multiple ProduceRequest across multiple partitions. + Future request1 = accumulator.append(tp0, time.milliseconds(), "k1".getBytes(), "v1".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + Future request2 = accumulator.append(tp1, time.milliseconds(), "k2".getBytes(), "v2".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; + + // Send request. + sender.run(time.milliseconds()); + assertEquals(1, client.inFlightRequestCount()); + assertEquals("Expect one in-flight batch in accumulator", 1, sender.inFlightBatches(tp0).size()); + + Map responseMap = new HashMap<>(); + responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); + client.respond(new ProduceResponse(responseMap)); + + // Successfully expire both batches. + time.sleep(deliveryTimeoutMs); + sender.run(time.milliseconds()); + assertEquals("Expect zero in-flight batch in accumulator", 0, sender.inFlightBatches(tp0).size()); + + try { + request1.get(); + fail("The expired batch should throw a TimeoutException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + + try { + request2.get(); + fail("The expired batch should throw a TimeoutException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof TimeoutException); + } + } + private class MatchingBufferPool extends BufferPool { IdentityHashMap allocatedBuffers;