diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index ef925be53..0b511cb21 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -69,7 +69,7 @@ private enum State { private volatile State state = State.INIT; private volatile Subscription upstream; private Subscription currentUpstream; - private boolean upstreamHasCompleted = false; + private volatile boolean upstreamHasCompleted = false; private Throwable failure; private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater @@ -148,10 +148,10 @@ private Throwable addFailure(Throwable failure) { @Override public void onCompletion() { + upstreamHasCompleted = true; if (state == State.CANCELLED) { return; } - upstreamHasCompleted = true; if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.CANCELLED) || STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)) { if (failure == null) { @@ -243,18 +243,18 @@ public void onCompletion() { if (state == State.CANCELLED) { return; } - if (!upstreamHasCompleted) { - state = State.WAITING_NEXT_PUBLISHER; - if (demand > 0L) { - upstream.request(1L); - } - } else { - state = State.CANCELLED; + state = State.WAITING_NEXT_PUBLISHER; + if (upstreamHasCompleted && STATE_UPDATER.compareAndSet(ConcatMapSubscriber.this, State.WAITING_NEXT_PUBLISHER, + State.CANCELLED)) { if (failure != null) { downstream.onFailure(failure); } else { downstream.onComplete(); } + } else { + if (demand > 0L) { + upstream.request(1L); + } } }