diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..ad04bd2c2 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,21 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.addedToInterface", + "new": "method io.smallrye.mutiny.Multi io.smallrye.mutiny.Multi::emitOn(java.util.concurrent.Executor, int)", + "justification": "The emitOn() internal buffer size must be configurable" + }, + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp::(io.smallrye.mutiny.Multi, java.util.concurrent.Executor)", + "new": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp::(io.smallrye.mutiny.Multi, java.util.concurrent.Executor, int)", + "justification": "The emitOn() internal buffer size must be configurable" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index b72901fd8..05e35151a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -212,6 +212,9 @@ default O stage(Function, O> stage) { * Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods * on the supplied {@link Executor}. *

+ * This operator delegates to {@link #emitOn(Executor, int)} with a default buffer size of + * {@link Infrastructure#getBufferSizeS()} items. + *

* Instead of receiving the {@code item} event on the thread firing the event, this method influences the * threading context to switch to a thread from the given executor. Same behavior for failure and completion. *

@@ -223,10 +226,38 @@ default O stage(Function, O> stage) { * * @param executor the executor to use, must not be {@code null} * @return a new {@link Multi} + * @see #emitOn(Executor, int) */ @CheckReturnValue Multi emitOn(Executor executor); + /** + * Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods + * on the supplied {@link Executor}. + *

+ * This operator uses a queue of capacity {@code bufferSize} to emit items from a running executor thread, + * reducing the need for thread context switches when items are emitted fast from the upstream. + *

+ * This operator tracks {@link Subscription#request(long)} demand, but it does not forward requests to the upstream. + * It instead requests {@code bufferSize} elements at subscription time and whenever {@code bufferSize} items have + * been emitted, allowing for efficient batching. + *

+ * Instead of receiving the {@code item} event on the thread firing the event, this method influences the + * threading context to switch to a thread from the given executor. Same behavior for failure and completion. + *

+ * Note that the subscriber is guaranteed to never be called concurrently. + *

+ * Be careful as this operator can lead to concurrency problems with non thread-safe objects such as + * CDI request-scoped beans. + * It might also break reactive-streams semantics with items being emitted concurrently. + * + * @param executor the executor to use, must not be {@code null} + * @param bufferSize the buffer size, must be strictly positive + * @return a new {@link Multi} + */ + @CheckReturnValue + Multi emitOn(Executor executor, int bufferSize); + /** * When a subscriber subscribes to this {@link Multi}, execute the subscription to the upstream {@link Multi} on a * thread from the given executor. As a result, the {@link Subscriber#onSubscribe(Subscription)} method will be called diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index 87d61e074..3a7d600c3 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -29,6 +29,7 @@ import io.smallrye.mutiny.groups.MultiSelect; import io.smallrye.mutiny.groups.MultiSkip; import io.smallrye.mutiny.groups.MultiSubscribe; +import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.multi.MultiCacheOp; import io.smallrye.mutiny.operators.multi.MultiDemandCapping; @@ -102,7 +103,16 @@ public Multi cache() { @Override public Multi emitOn(Executor executor) { - return Infrastructure.onMultiCreation(new MultiEmitOnOp<>(this, nonNull(executor, "executor"))); + return emitOn(executor, Infrastructure.getBufferSizeS()); + } + + @Override + public Multi emitOn(Executor executor, int bufferSize) { + return Infrastructure.onMultiCreation( + new MultiEmitOnOp<>( + this, + nonNull(executor, "executor"), + ParameterValidation.positive(bufferSize, "bufferSize"))); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java index 112b5c986..fa2b26554 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java @@ -9,13 +9,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.helpers.queues.Queues; -import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.BackPressureFailure; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -27,24 +25,25 @@ public class MultiEmitOnOp extends AbstractMultiOperator { private final Executor executor; - private final Supplier> queueSupplier = Queues.get(Infrastructure.getBufferSizeS()); + private final int bufferSize; - public MultiEmitOnOp(Multi upstream, Executor executor) { + public MultiEmitOnOp(Multi upstream, Executor executor, int bufferSize) { super(upstream); - this.executor = ParameterValidation.nonNull(executor, "executor"); + this.executor = executor; + this.bufferSize = bufferSize; } @Override public void subscribe(MultiSubscriber downstream) { ParameterValidation.nonNullNpe(downstream, "subscriber"); - upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, queueSupplier)); + upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, bufferSize)); } static final class MultiEmitOnProcessor extends MultiOperatorProcessor implements Runnable { private final Executor executor; - private final int limit; + private final int bufferSize; // State variables @@ -75,18 +74,18 @@ static final class MultiEmitOnProcessor extends MultiOperatorProcessor MultiEmitOnProcessor(MultiSubscriber downstream, Executor executor, - Supplier> queueSupplier) { + int bufferSize) { super(downstream); this.executor = executor; - this.limit = 16; - this.queue = queueSupplier.get(); + this.bufferSize = bufferSize; + this.queue = Queues.createMpscArrayQueue(bufferSize); } @Override public void onSubscribe(Flow.Subscription subscription) { if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); - subscription.request(16); + subscription.request(bufferSize); } else { subscription.cancel(); } @@ -200,7 +199,7 @@ public void run() { // updating the number of emitted items. emitted++; - if (emitted == limit) { + if (emitted == bufferSize) { if (requests != Long.MAX_VALUE) { requests = requested.addAndGet(-emitted); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiEmitOnTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiEmitOnTest.java index 161c1478b..28fd8e966 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiEmitOnTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiEmitOnTest.java @@ -1,6 +1,6 @@ package io.smallrye.mutiny.operators; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.mock; @@ -8,6 +8,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -46,6 +47,20 @@ public void testWithSequenceOfItems() { .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); } + @RepeatedTest(10) + public void testWithSequenceOfItemsAndBufferSize() { + AtomicInteger requestSignalsCount = new AtomicInteger(); + AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .onRequest().invoke(requestSignalsCount::incrementAndGet) + .emitOn(executor, 3) + .subscribe().withSubscriber(AssertSubscriber.create()); + + subscriber.request(Long.MAX_VALUE) + .awaitCompletion() + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(requestSignalsCount.get()).isEqualTo(4); + } + @Test public void testWithRequest0() { AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) @@ -92,4 +107,18 @@ public void subscribe(MultiSubscriber subscriber) { subscriber.assertFailedWith(BackPressureFailure.class, ""); } + @Test + public void testBufferSizeValidation() { + Multi multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + assertThatThrownBy(() -> multi.emitOn(executor, -58)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSize"); + + assertThatThrownBy(() -> multi.emitOn(executor, 0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("bufferSize"); + + assertThatCode(() -> multi.emitOn(executor, 58)).doesNotThrowAnyException(); + } }