Skip to content

Commit

Permalink
fix: avoid a race condition in the new concatMap operator
Browse files Browse the repository at this point in the history
Avoids a race-condition between the inner and outer subscribers on the
completion signals.

Co-authored-by: Ozan Gunalp <[email protected]>
  • Loading branch information
jponge and ozangunalp committed Dec 13, 2023
1 parent cdbd2eb commit bc71eb8
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private enum State {
private volatile State state = State.INIT;
private volatile Subscription upstream;
private Subscription currentUpstream;
private boolean upstreamHasCompleted = false;
private volatile boolean upstreamHasCompleted = false;
private volatile boolean innerHasCompleted = false;
private Throwable failure;

private static final AtomicReferenceFieldUpdater<ConcatMapSubscriber, Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
Expand Down Expand Up @@ -153,7 +154,8 @@ public void onCompletion() {
}
upstreamHasCompleted = true;
if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.CANCELLED)
|| STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)) {
|| STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)
|| innerHasCompleted) {
if (failure == null) {
downstream.onCompletion();
} else {
Expand Down Expand Up @@ -205,11 +207,11 @@ public void onSubscribe(Subscription subscription) {
if (state == State.CANCELLED) {
return;
}
innerHasCompleted = false;
currentUpstream = subscription;
state = State.EMITTING;
long pending = demand;
if (pending > 0L) {
currentUpstream.request(pending);
if (demand > 0L) {
currentUpstream.request(demand);
}
}

Expand Down Expand Up @@ -243,6 +245,7 @@ public void onCompletion() {
if (state == State.CANCELLED) {
return;
}
innerHasCompleted = true;
if (!upstreamHasCompleted) {
state = State.WAITING_NEXT_PUBLISHER;
if (demand > 0L) {
Expand Down

0 comments on commit bc71eb8

Please sign in to comment.