Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operators)!: introduce a buffer size parameter to Multi::emitOn #1761

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.addedToInterface",
"new": "method io.smallrye.mutiny.Multi<T> io.smallrye.mutiny.Multi<T>::emitOn(java.util.concurrent.Executor, int)",
"justification": "The emitOn() internal buffer size must be configurable"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp<T>::<init>(io.smallrye.mutiny.Multi<? extends T>, java.util.concurrent.Executor)",
"new": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp<T>::<init>(io.smallrye.mutiny.Multi<? extends T>, java.util.concurrent.Executor, int)",
"justification": "The emitOn() internal buffer size must be configurable"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
31 changes: 31 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ default <O> O stage(Function<Multi<T>, O> stage) {
* Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods
* on the supplied {@link Executor}.
* <p>
* This operator delegates to {@link #emitOn(Executor, int)} with a default buffer size of
* {@link Infrastructure#getBufferSizeS()} items.
* <p>
* Instead of receiving the {@code item} event on the thread firing the event, this method influences the
* threading context to switch to a thread from the given executor. Same behavior for failure and completion.
* <p>
Expand All @@ -223,10 +226,38 @@ default <O> O stage(Function<Multi<T>, O> stage) {
*
* @param executor the executor to use, must not be {@code null}
* @return a new {@link Multi}
* @see #emitOn(Executor, int)
*/
@CheckReturnValue
Multi<T> emitOn(Executor executor);

/**
* Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods
* on the supplied {@link Executor}.
* <p>
* This operator uses a queue of capacity {@code bufferSize} to emit items from a running executor thread,
* reducing the need for thread context switches when items are emitted fast from the upstream.
* <p>
* This operator tracks {@link Subscription#request(long)} demand, but it does not forward requests to the upstream.
* It instead requests {@code bufferSize} elements at subscription time and whenever {@code bufferSize} items have
* been emitted, allowing for efficient batching.
* <p>
* Instead of receiving the {@code item} event on the thread firing the event, this method influences the
* threading context to switch to a thread from the given executor. Same behavior for failure and completion.
* <p>
* Note that the subscriber is guaranteed to never be called concurrently.
* <p>
* <strong>Be careful as this operator can lead to concurrency problems with non thread-safe objects such as
* CDI request-scoped beans.
* It might also break reactive-streams semantics with items being emitted concurrently.</strong>
*
* @param executor the executor to use, must not be {@code null}
* @param bufferSize the buffer size, must be strictly positive
* @return a new {@link Multi}
*/
@CheckReturnValue
Multi<T> emitOn(Executor executor, int bufferSize);

/**
* When a subscriber subscribes to this {@link Multi}, execute the subscription to the upstream {@link Multi} on a
* thread from the given executor. As a result, the {@link Subscriber#onSubscribe(Subscription)} method will be called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.smallrye.mutiny.groups.MultiSelect;
import io.smallrye.mutiny.groups.MultiSkip;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiCacheOp;
import io.smallrye.mutiny.operators.multi.MultiDemandCapping;
Expand Down Expand Up @@ -102,7 +103,16 @@ public Multi<T> cache() {

@Override
public Multi<T> emitOn(Executor executor) {
return Infrastructure.onMultiCreation(new MultiEmitOnOp<>(this, nonNull(executor, "executor")));
return emitOn(executor, Infrastructure.getBufferSizeS());
}

@Override
public Multi<T> emitOn(Executor executor, int bufferSize) {
return Infrastructure.onMultiCreation(
new MultiEmitOnOp<>(
this,
nonNull(executor, "executor"),
ParameterValidation.positive(bufferSize, "bufferSize")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand All @@ -27,24 +25,25 @@
public class MultiEmitOnOp<T> extends AbstractMultiOperator<T, T> {

private final Executor executor;
private final Supplier<? extends Queue<T>> queueSupplier = Queues.get(Infrastructure.getBufferSizeS());
private final int bufferSize;

public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor) {
public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor, int bufferSize) {
super(upstream);
this.executor = ParameterValidation.nonNull(executor, "executor");
this.executor = executor;
this.bufferSize = bufferSize;
}

@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
ParameterValidation.nonNullNpe(downstream, "subscriber");
upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, queueSupplier));
upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, bufferSize));
}

static final class MultiEmitOnProcessor<T> extends MultiOperatorProcessor<T, T> implements Runnable {

private final Executor executor;

private final int limit;
private final int bufferSize;

// State variables

Expand Down Expand Up @@ -75,18 +74,18 @@ static final class MultiEmitOnProcessor<T> extends MultiOperatorProcessor<T, T>

MultiEmitOnProcessor(MultiSubscriber<? super T> downstream,
Executor executor,
Supplier<? extends Queue<T>> queueSupplier) {
int bufferSize) {
super(downstream);
this.executor = executor;
this.limit = 16;
this.queue = queueSupplier.get();
this.bufferSize = bufferSize;
this.queue = Queues.createMpscArrayQueue(bufferSize);
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
downstream.onSubscribe(this);
subscription.request(16);
subscription.request(bufferSize);
} else {
subscription.cancel();
}
Expand Down Expand Up @@ -200,7 +199,7 @@ public void run() {

// updating the number of emitted items.
emitted++;
if (emitted == limit) {
if (emitted == bufferSize) {
if (requests != Long.MAX_VALUE) {
requests = requested.addAndGet(-emitted);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -46,6 +47,20 @@ public void testWithSequenceOfItems() {
.assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@RepeatedTest(10)
public void testWithSequenceOfItemsAndBufferSize() {
AtomicInteger requestSignalsCount = new AtomicInteger();
AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.onRequest().invoke(requestSignalsCount::incrementAndGet)
.emitOn(executor, 3)
.subscribe().withSubscriber(AssertSubscriber.create());

subscriber.request(Long.MAX_VALUE)
.awaitCompletion()
.assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(requestSignalsCount.get()).isEqualTo(4);
}

@Test
public void testWithRequest0() {
AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Expand Down Expand Up @@ -92,4 +107,18 @@ public void subscribe(MultiSubscriber<? super Integer> subscriber) {
subscriber.assertFailedWith(BackPressureFailure.class, "");
}

@Test
public void testBufferSizeValidation() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

assertThatThrownBy(() -> multi.emitOn(executor, -58))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("bufferSize");

assertThatThrownBy(() -> multi.emitOn(executor, 0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("bufferSize");

assertThatCode(() -> multi.emitOn(executor, 58)).doesNotThrowAnyException();
}
}
Loading