diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index 5371de464..3676246cb 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -100,6 +100,16 @@ public void onSubscribe(Flow.Subscription subscription) { } } + private void innerOnSubscribe(Flow.Subscription subscription) { + stateLock.lock(); + innerUpstream = subscription; + long n = demand; + stateLock.unlock(); + if (n > 0L) { + subscription.request(n); + } + } + @Override public void onItem(I item) { if (STATE_UPDATER.compareAndSet(this, State.PUBLISHER_REQUESTED, State.EMITTING)) { @@ -247,7 +257,9 @@ public void request(long n) { case EMITTING: case EMITTING_FINAL: stateLock.unlock(); - innerUpstream.request(n); + if (innerUpstream != null) { + innerUpstream.request(n); + } break; case READY: state = State.PUBLISHER_REQUESTED; @@ -282,16 +294,7 @@ private class InnerSubscriber implements MultiSubscriber, ContextSupport { @Override public void onSubscribe(Flow.Subscription subscription) { - stateLock.lock(); - innerUpstream = subscription; - try { - long n = demand; - if (n > 0L) { - subscription.request(n); - } - } finally { - stateLock.unlock(); - } + innerOnSubscribe(subscription); } @Override diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java index d4b75c92a..727cc1054 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java @@ -10,6 +10,7 @@ import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -348,4 +349,16 @@ void simpleConcatMap() { sub.assertItems(10, 20, 20, 40); sub.assertCompleted(); } + + @RepeatedTest(1000) + void earlyRequestWithNullInner() { + var sub = Multi.createFrom().items(1, 2, 3) + .emitOn(Infrastructure.getDefaultExecutor()) + .onItem().call(i -> Uni.createFrom().item(i)) + .subscribe().withSubscriber(AssertSubscriber.create()); + + sub.request(1); + sub.awaitNextItems(1, Duration.ofSeconds(1)); + sub.cancel(); + } }