diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index ef925be53..6c6812659 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -69,7 +69,8 @@ private enum State { private volatile State state = State.INIT; private volatile Subscription upstream; private Subscription currentUpstream; - private boolean upstreamHasCompleted = false; + private volatile boolean upstreamHasCompleted = false; + private volatile boolean innerHasCompleted = false; private Throwable failure; private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater @@ -153,7 +154,8 @@ public void onCompletion() { } upstreamHasCompleted = true; if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.CANCELLED) - || STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)) { + || STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED) + || innerHasCompleted) { if (failure == null) { downstream.onCompletion(); } else { @@ -205,11 +207,11 @@ public void onSubscribe(Subscription subscription) { if (state == State.CANCELLED) { return; } + innerHasCompleted = false; currentUpstream = subscription; state = State.EMITTING; - long pending = demand; - if (pending > 0L) { - currentUpstream.request(pending); + if (demand > 0L) { + currentUpstream.request(demand); } } @@ -243,6 +245,7 @@ public void onCompletion() { if (state == State.CANCELLED) { return; } + innerHasCompleted = true; if (!upstreamHasCompleted) { state = State.WAITING_NEXT_PUBLISHER; if (demand > 0L) {