From 3e01173bda56f37615fefe2751f10b6ab69902f4 Mon Sep 17 00:00:00 2001 From: Klaasjan te Voortwis Date: Thu, 4 Mar 2021 17:02:28 +0100 Subject: [PATCH] keep pushback intack on size+duration In the previous implementation the logic was partly handled by calling intoMultis().every(Duration). This was causing to get an unlimited number of items from upstream. When processing a kafka stream with many small messages fitting in memory, the throttled policy would eventually (60seconds) see 'stale' non-processed messages causing an exception shutting processing down completely. This problem is solved using the MultiBufferWithTimeoutOp directly. --- .../java/io/smallrye/mutiny/groups/MultiGroupIntoLists.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiGroupIntoLists.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiGroupIntoLists.java index 0274c3d0e..9d27a27a6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiGroupIntoLists.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiGroupIntoLists.java @@ -102,7 +102,7 @@ public Multi> of(int size, int skip) { * @return a Multi emitting lists of at most {@code size} items from the upstream Multi. */ public Multi> of(int size, Duration maximumDelay) { - return upstream.group().intoMultis().every(maximumDelay) - .flatMap(withTimeout -> withTimeout.group().intoLists().of(size)); + return Infrastructure.onMultiCreation(new MultiBufferWithTimeoutOp<>(upstream, positive(size, "size"), + validate(maximumDelay, "maximumDelay"), Infrastructure.getDefaultWorkerPool())); } }