Skip to content

Commit

Permalink
keep pushback intack on size+duration
Browse files Browse the repository at this point in the history
In the previous implementation the logic was partly handled by calling intoMultis().every(Duration). This was causing to get an unlimited number of items from upstream.
When processing a kafka stream with many small messages fitting in memory, the throttled policy would eventually (60seconds) see 'stale' non-processed messages causing an exception shutting processing down completely. This problem is solved using the MultiBufferWithTimeoutOp directly.
  • Loading branch information
voortwis authored Mar 4, 2021
1 parent f8bb371 commit 3e01173
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Multi<List<T>> of(int size, int skip) {
* @return a Multi emitting lists of at most {@code size} items from the upstream Multi.
*/
public Multi<List<T>> of(int size, Duration maximumDelay) {
return upstream.group().intoMultis().every(maximumDelay)
.flatMap(withTimeout -> withTimeout.group().intoLists().of(size));
return Infrastructure.onMultiCreation(new MultiBufferWithTimeoutOp<>(upstream, positive(size, "size"),
validate(maximumDelay, "maximumDelay"), Infrastructure.getDefaultWorkerPool()));
}
}

0 comments on commit 3e01173

Please sign in to comment.