Skip to content

Commit

Permalink
Perform strict bound checks inside operators rather than using queue.…
Browse files Browse the repository at this point in the history
…size() which is not constant-time
  • Loading branch information
jponge committed Oct 23, 2023
1 parent 594c645 commit 53d7c2c
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,4 @@ public static <T> Queue<T> createMpscArrayQueue(int size) {
return new MpscAtomicArrayQueue<>(size);
}
}

/**
* Check when a non-strictly sized queue overflow.
*
* @param queue the queue
* @param limit the limit, a negative value assumes an unbounded queue
* @return {@code true} if the queue overflow, {@code false} otherwise
*/
public static boolean isOverflowing(Queue<?> queue, int limit) {
if (limit < 0) {
return false;
}
return queue.size() >= limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand All @@ -16,6 +15,7 @@ public class BufferItemMultiEmitter<T> extends BaseMultiEmitter<T> {
private Throwable failure;
private volatile boolean done;
private final AtomicInteger wip = new AtomicInteger();
private final AtomicInteger strictBoundCounter = new AtomicInteger();

BufferItemMultiEmitter(MultiSubscriber<? super T> actual, Queue<T> queue, int overflowBufferSize) {
super(actual);
Expand All @@ -33,7 +33,7 @@ public MultiEmitter<T> emit(T t) {
fail(new NullPointerException("`emit` called with `null`."));
return this;
}
if (queue.offer(t) && !Queues.isOverflowing(queue, overflowBufferSize)) {
if (queue.offer(t) && (overflowBufferSize == -1 || strictBoundCounter.incrementAndGet() < overflowBufferSize)) {
drain();
} else {
fail(new EmitterBufferOverflowException());
Expand Down Expand Up @@ -117,6 +117,9 @@ void drain() {
}

try {
if (overflowBufferSize != -1) {
strictBoundCounter.decrementAndGet();
}
downstream.onItem(o);
} catch (Throwable x) {
cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor<T, T> {

private final AtomicLong requested = new AtomicLong();
private final AtomicInteger wip = new AtomicInteger();
private final AtomicInteger strictBoundCounter = new AtomicInteger();

volatile boolean cancelled;
volatile boolean done;
Expand All @@ -70,7 +71,7 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onItem(T t) {
if ((!unbounded && Queues.isOverflowing(queue, bufferSize)) || !queue.offer(t)) {
if ((!unbounded && strictBoundCounter.getAndIncrement() >= bufferSize) || !queue.offer(t)) {
BackPressureFailure bpf = new BackPressureFailure(
"The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough");
if (dropUniMapper != null) {
Expand Down Expand Up @@ -167,6 +168,9 @@ void drain() {
if (wasEmpty) {
break;
}
if (!unbounded) {
strictBoundCounter.decrementAndGet();
}
downstream.onItem(item);
emitted++;
}
Expand Down

0 comments on commit 53d7c2c

Please sign in to comment.