From dded2d6747b3b64a98aaae2209b4e94790cb0cff Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 16 Jan 2024 22:43:28 +0100 Subject: [PATCH] fix: rewrite the concatMap operator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this implementation the operator acts as a simple no-queue forwarder between the subscriber and the current upstream. There is also a small dose of fine-grained locking around a few state machine updates that can’t be expressed as non-blocking / compare & swap operations. --- implementation/revapi.json | 9 +- .../smallrye/mutiny/groups/MultiOnItem.java | 8 +- .../operators/multi/MultiConcatMapOp.java | 392 +++++++++--------- .../multi/MultiConcatMapNoPrefetchTest.java | 25 ++ 4 files changed, 239 insertions(+), 195 deletions(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..c045f46fe 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,14 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.class.removed", + "old": "class io.smallrye.mutiny.operators.multi.MultiConcatMapOp.ConcatMapMainSubscriber", + "justification": "Refactoring of internal APIs" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnItem.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnItem.java index 31e7428b4..f0c09312b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnItem.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnItem.java @@ -264,7 +264,13 @@ public Multi transformToIterable(Function MultiFlatten transformToUni(Function> mapper) { Function> actual = Infrastructure.decorate(nonNull(mapper, "mapper")); - Function> wrapper = res -> actual.apply(res).toMulti(); + Function> wrapper = res -> { + Uni uni = actual.apply(res); + if (uni == null) { + return Multi.createFrom().failure(new NullPointerException(MAPPER_RETURNED_NULL)); + } + return uni.toMulti(); + }; return new MultiFlatten<>(upstream, wrapper, 1, false); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index eb23519a9..0161a7757 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -1,21 +1,21 @@ package io.smallrye.mutiny.operators.multi; +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; -import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.ContextSupport; import io.smallrye.mutiny.subscription.MultiSubscriber; -import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber; /** * ConcatMap operator without prefetching items from the upstream. @@ -25,7 +25,7 @@ *
  • The inner has no more outstanding requests.
  • *
  • The inner completed without emitting items or with outstanding requests.
  • * - * + *

    * This operator can collect failures and postpone them until termination. * * @param the upstream value type / input type @@ -50,203 +50,221 @@ public void subscribe(MultiSubscriber subscriber) { if (subscriber == null) { throw new NullPointerException("The subscriber must not be `null`"); } - ConcatMapMainSubscriber sub = new ConcatMapMainSubscriber<>(subscriber, - mapper, - postponeFailurePropagation); - + MainSubscriber sub = new MainSubscriber<>(mapper, postponeFailurePropagation, subscriber); upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub)); } - public static final class ConcatMapMainSubscriber implements MultiSubscriber, Subscription, ContextSupport { - - private static final int STATE_NEW = 0; // no request yet -- send first upstream request at this state - private static final int STATE_READY = 1; // first upstream request done, ready to receive items - private static final int STATE_EMITTING = 2; // received item from the upstream, subscribed to the inner - private static final int STATE_OUTER_TERMINATED = 3; // outer terminated, waiting for the inner to terminate - private static final int STATE_TERMINATED = 4; // inner and outer terminated - private static final int STATE_CANCELLED = 5; // cancelled - final AtomicInteger state = new AtomicInteger(STATE_NEW); + private enum State { + INIT, + READY, + PUBLISHER_REQUESTED, + EMITTING, + EMITTING_FINAL, + DONE + } - final MultiSubscriber downstream; - final Function> mapper; - private final boolean delayError; + private static class MainSubscriber implements MultiSubscriber, Flow.Subscription, ContextSupport { - final AtomicReference failures = new AtomicReference<>(); + private final Function> mapper; + private final boolean postponeFailurePropagation; + private final MultiSubscriber downstream; - volatile Subscription upstream = null; - private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ConcatMapMainSubscriber.class, Subscription.class, "upstream"); + private volatile State state = State.INIT; + private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(MainSubscriber.class, State.class, "state"); - final ConcatMapInner inner; + private volatile long demand = 0L; + private static final AtomicLongFieldUpdater DEMAND_UPDATER = AtomicLongFieldUpdater + .newUpdater(MainSubscriber.class, "demand"); - private final AtomicBoolean deferredUpstreamRequest = new AtomicBoolean(false); + private final InnerSubscriber innerSubscriber = new InnerSubscriber(); + private final ReentrantLock stateLock = new ReentrantLock(); + private volatile Throwable failure; + private Flow.Subscription mainUpstream; + private volatile Flow.Subscription innerUpstream; - ConcatMapMainSubscriber( - MultiSubscriber downstream, - Function> mapper, - boolean delayError) { - this.downstream = downstream; + private MainSubscriber(Function> mapper, boolean postponeFailurePropagation, + MultiSubscriber downstream) { this.mapper = mapper; - this.delayError = delayError; - this.inner = new ConcatMapInner<>(this); + this.postponeFailurePropagation = postponeFailurePropagation; + this.downstream = downstream; } @Override - public void request(long n) { - if (n > 0) { - if (state.compareAndSet(STATE_NEW, STATE_READY)) { - upstream.request(1); - } - if (deferredUpstreamRequest.compareAndSet(true, false)) { - upstream.request(1); - } - inner.request(n); - if (inner.requested() != 0L && deferredUpstreamRequest.compareAndSet(true, false)) { - upstream.request(1); - } + public void onSubscribe(Flow.Subscription subscription) { + if (STATE_UPDATER.compareAndSet(this, State.INIT, State.READY)) { + mainUpstream = subscription; + downstream.onSubscribe(this); } else { - downstream.onFailure(Subscriptions.getInvalidRequestException()); + subscription.cancel(); } } @Override - public void cancel() { - while (true) { - int state = this.state.get(); - if (state == STATE_CANCELLED) { - return; - } - if (this.state.compareAndSet(state, STATE_CANCELLED)) { - if (state == STATE_OUTER_TERMINATED) { - inner.cancel(); - } else { - inner.cancel(); - upstream.cancel(); - } - return; + public void onItem(I item) { + if (STATE_UPDATER.compareAndSet(this, State.PUBLISHER_REQUESTED, State.EMITTING)) { + try { + Publisher publisher = requireNonNull(mapper.apply(item), + "The mapper produced a null publisher"); + publisher.subscribe(innerSubscriber); + } catch (Throwable err) { + state = State.DONE; + mainUpstream.cancel(); + downstream.onFailure(addFailure(err)); } } } - @Override - public void onSubscribe(Subscription subscription) { - if (UPSTREAM_UPDATER.compareAndSet(this, null, subscription)) { - downstream.onSubscribe(this); + private void innerOnItem(O item) { + if (state != State.DONE) { + DEMAND_UPDATER.decrementAndGet(this); + downstream.onItem(item); } } @Override - public void onItem(I item) { - if (!state.compareAndSet(STATE_READY, STATE_EMITTING)) { - return; - } - - try { - Publisher p = mapper.apply(item); - if (p == null) { - throw new NullPointerException(ParameterValidation.MAPPER_RETURNED_NULL); - } - - p.subscribe(inner); - } catch (Throwable e) { - if (postponeFailure(e, upstream)) { - innerComplete(0L); + public void onFailure(Throwable failure) { + if (STATE_UPDATER.getAndSet(this, State.DONE) != State.DONE) { + if (innerUpstream != null) { + innerUpstream.cancel(); } + downstream.onFailure(addFailure(failure)); + } else { + Infrastructure.handleDroppedException(failure); } } - @Override - public void onFailure(Throwable t) { - if (postponeFailure(t, inner)) { - onCompletion(); + private void innerOnFailure(Throwable failure) { + Throwable throwable = addFailure(failure); + stateLock.lock(); + switch (state) { + case EMITTING: + if (postponeFailurePropagation) { + if (demand > 0L) { + state = State.PUBLISHER_REQUESTED; + stateLock.unlock(); + mainUpstream.request(1L); + } else { + state = State.READY; + stateLock.unlock(); + } + } else { + state = State.DONE; + stateLock.unlock(); + mainUpstream.cancel(); + downstream.onFailure(throwable); + } + break; + case EMITTING_FINAL: + state = State.DONE; + stateLock.unlock(); + mainUpstream.cancel(); + downstream.onFailure(throwable); + break; + default: + stateLock.unlock(); + Infrastructure.handleDroppedException(failure); + break; } } - @Override - public void onCompletion() { - while (true) { - int state = this.state.get(); - if (state == STATE_NEW || state == STATE_READY) { - if (this.state.compareAndSet(state, STATE_TERMINATED)) { - terminateDownstream(); - return; - } - } else if (state == STATE_EMITTING) { - if (this.state.compareAndSet(state, STATE_OUTER_TERMINATED)) { - return; - } + private Throwable addFailure(Throwable failure) { + if (this.failure != null) { + if (this.failure instanceof CompositeException) { + this.failure = new CompositeException((CompositeException) this.failure, failure); } else { - return; + this.failure = new CompositeException(this.failure, failure); } + } else { + this.failure = failure; } + return this.failure; } - public synchronized void tryEmit(O value) { - switch (state.get()) { - case STATE_EMITTING: - case STATE_OUTER_TERMINATED: - downstream.onItem(value); + @Override + public void onCompletion() { + stateLock.lock(); + switch (state) { + case EMITTING: + state = State.EMITTING_FINAL; + stateLock.unlock(); + break; + case READY: + case PUBLISHER_REQUESTED: + stateLock.unlock(); + terminate(); break; default: + stateLock.unlock(); break; } } - public void innerComplete(long emitted) { - if (this.state.compareAndSet(STATE_EMITTING, STATE_READY)) { - // Inner completed but there are outstanding requests from inner, - // Or the inner completed without producing any items - // Request new item from upstream - if (inner.requested() != 0L || emitted == 0) { - upstream.request(1); - } else { - deferredUpstreamRequest.set(true); - } - } else if (this.state.compareAndSet(STATE_OUTER_TERMINATED, STATE_TERMINATED)) { - terminateDownstream(); + private void innerOnCompletion() { + stateLock.lock(); + switch (state) { + case EMITTING: + if (demand > 0L) { + state = State.PUBLISHER_REQUESTED; + stateLock.unlock(); + mainUpstream.request(1L); + } else { + state = State.READY; + stateLock.unlock(); + } + break; + case EMITTING_FINAL: + stateLock.unlock(); + terminate(); + break; + default: + stateLock.unlock(); + break; } } - public void innerFailure(Throwable e, long emitted) { - if (postponeFailure(e, upstream)) { - innerComplete(emitted); + private void terminate() { + if (STATE_UPDATER.getAndSet(this, State.DONE) != State.DONE) { + if (failure != null) { + downstream.onFailure(failure); + } else { + downstream.onCompletion(); + } } } - private boolean postponeFailure(Throwable e, Subscription subscription) { - if (e == null) { - return true; - } - - Subscriptions.addFailure(failures, e); - - if (delayError) { - return true; - } - - while (true) { - int state = this.state.get(); - if (state == STATE_CANCELLED || state == STATE_TERMINATED) { - return false; - } else { - if (this.state.compareAndSet(state, STATE_TERMINATED)) { - subscription.cancel(); - synchronized (this) { - downstream.onFailure(failures.get()); - } - return false; - } + @Override + public void request(long n) { + if (n <= 0) { + state = State.DONE; + downstream.onFailure(Subscriptions.getInvalidRequestException()); + } else { + Subscriptions.add(DEMAND_UPDATER, this, n); + stateLock.lock(); + switch (state) { + case EMITTING: + case EMITTING_FINAL: + stateLock.unlock(); + innerUpstream.request(n); + break; + case READY: + state = State.PUBLISHER_REQUESTED; + stateLock.unlock(); + mainUpstream.request(1L); + break; + default: + stateLock.unlock(); + break; } } } - private void terminateDownstream() { - Throwable ex = failures.get(); - if (ex != null) { - downstream.onFailure(ex); - return; + @Override + public void cancel() { + mainUpstream.cancel(); + if (innerUpstream != null) { + innerUpstream.cancel(); } - downstream.onCompletion(); } @Override @@ -258,57 +276,45 @@ public Context context() { } } - } - - static final class ConcatMapInner extends SwitchableSubscriptionSubscriber { - private final ConcatMapMainSubscriber parent; - - long emitted; - - /** - * Downstream passed as {@code null} to {@link SwitchableSubscriptionSubscriber} as accessors are not reachable. - * Effective downstream is {@code parent}. - * - * @param parent parent as downstream - */ - ConcatMapInner(ConcatMapMainSubscriber parent) { - super(null); - this.parent = parent; - } - - @Override - public void onItem(O item) { - emitted++; - parent.tryEmit(item); - } - - @Override - public void onFailure(Throwable failure) { - long p = emitted; + private class InnerSubscriber implements MultiSubscriber, ContextSupport { - if (p != 0L) { - emitted = 0L; - emitted(p); + @Override + public void onSubscribe(Flow.Subscription subscription) { + stateLock.lock(); + innerUpstream = subscription; + try { + long n = demand; + if (n > 0L) { + subscription.request(n); + } + } finally { + stateLock.unlock(); + } } - parent.innerFailure(failure, p); - } - - @Override - public void onCompletion() { - long p = emitted; + @Override + public void onItem(O item) { + innerOnItem(item); + } - if (p != 0L) { - emitted = 0L; - emitted(p); + @Override + public void onFailure(Throwable failure) { + innerOnFailure(failure); } - parent.innerComplete(p); - } + @Override + public void onCompletion() { + innerOnCompletion(); + } - @Override - public Context context() { - return parent.context(); + @Override + public Context context() { + if (downstream instanceof ContextSupport) { + return ((ContextSupport) downstream).context(); + } else { + return Context.empty(); + } + } } } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java index d9b12ebae..d4b75c92a 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java @@ -271,6 +271,31 @@ void testInnerCompleteSubscriptionAfterTermination() { ts.awaitCompletion(); } + @Test + void testPostponedFailureWithBoundaryDemand() { + AssertSubscriber sub = Multi.createFrom().items(1, 2, 3) + .onItem().transformToMulti(n -> Multi.createFrom(). emitter(emitter -> { + emitter.emit("foo"); + emitter.emit("bar"); + emitter.fail(new Exception("baz")); + })).collectFailures().concatenate() + .subscribe().withSubscriber(AssertSubscriber.create()); + + sub.request(2); + sub.assertNotTerminated().assertItems("foo", "bar"); + + sub.request(2); + sub.assertNotTerminated().assertItems("foo", "bar", "foo", "bar"); + + sub.request(3); + sub.assertTerminated().assertItems("foo", "bar", "foo", "bar", "foo", "bar"); + + sub.assertFailedWith(CompositeException.class); + assertThat(((CompositeException) sub.getFailure()).getCauses()) + .hasSize(3) + .allMatch(err -> "baz".equals(err.getMessage())); + } + @Test void testUpfrontCompletion() { AssertSubscriber sub = Multi.createFrom().empty()