Skip to content

Commit

Permalink
Merge pull request #1499 from smallrye/fix/concatmap-early-null-inner…
Browse files Browse the repository at this point in the history
…Upstream

fix: concatMap early first upstream issues
  • Loading branch information
jponge authored Jan 26, 2024
2 parents 962d18b + f18296b commit 4ff850a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public void onSubscribe(Flow.Subscription subscription) {
}
}

private void innerOnSubscribe(Flow.Subscription subscription) {
stateLock.lock();
innerUpstream = subscription;
long n = demand;
stateLock.unlock();
if (n > 0L) {
subscription.request(n);
}
}

@Override
public void onItem(I item) {
if (STATE_UPDATER.compareAndSet(this, State.PUBLISHER_REQUESTED, State.EMITTING)) {
Expand Down Expand Up @@ -247,7 +257,9 @@ public void request(long n) {
case EMITTING:
case EMITTING_FINAL:
stateLock.unlock();
innerUpstream.request(n);
if (innerUpstream != null) {
innerUpstream.request(n);
}
break;
case READY:
state = State.PUBLISHER_REQUESTED;
Expand Down Expand Up @@ -282,16 +294,7 @@ private class InnerSubscriber implements MultiSubscriber<O>, ContextSupport {

@Override
public void onSubscribe(Flow.Subscription subscription) {
stateLock.lock();
innerUpstream = subscription;
try {
long n = demand;
if (n > 0L) {
subscription.request(n);
}
} finally {
stateLock.unlock();
}
innerOnSubscribe(subscription);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -348,4 +349,16 @@ void simpleConcatMap() {
sub.assertItems(10, 20, 20, 40);
sub.assertCompleted();
}

@RepeatedTest(1000)
void earlyRequestWithNullInner() {
var sub = Multi.createFrom().items(1, 2, 3)
.emitOn(Infrastructure.getDefaultExecutor())
.onItem().call(i -> Uni.createFrom().item(i))
.subscribe().withSubscriber(AssertSubscriber.create());

sub.request(1);
sub.awaitNextItems(1, Duration.ofSeconds(1));
sub.cancel();
}
}

0 comments on commit 4ff850a

Please sign in to comment.