diff --git a/implementation/revapi.json b/implementation/revapi.json index 6dcbd5386..524ce9c19 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -35,7 +35,15 @@ "criticality": "highlight", "minSeverity": "POTENTIALLY_BREAKING", "minCriticality": "documented", - "differences": [] + "differences": [ + { + "ignore": true, + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.mutiny.groups.MultiRetry::(io.smallrye.mutiny.Multi, java.util.function.Predicate)", + "new": "method void io.smallrye.mutiny.groups.MultiRetry::(io.smallrye.mutiny.Multi, java.util.function.Predicate, java.util.function.Predicate)", + "justification": "MultiRetry is an operator class (not user facing). The change adds support for the onFailure predicate" + } + ] } }, { diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiRetry.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiRetry.java index f9431c4a4..20b52666b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiRetry.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiRetry.java @@ -21,15 +21,16 @@ public class MultiRetry { private final Multi upstream; - private final Predicate predicate; + private final Predicate onFailurePredicate; private Duration initialBackOff = Duration.ofSeconds(1); private Duration maxBackoff = ExponentialBackoff.MAX_BACKOFF; private double jitter = ExponentialBackoff.DEFAULT_JITTER; private boolean backOffConfigured = false; - public MultiRetry(Multi upstream, Predicate predicate) { + public MultiRetry(Multi upstream, + Predicate onFailurePredicate) { this.upstream = nonNull(upstream, "upstream"); - this.predicate = predicate; + this.onFailurePredicate = nonNull(onFailurePredicate, "onFailurePredicate"); } /** @@ -63,14 +64,10 @@ public Multi atMost(long numberOfAttempts) { .randomExponentialBackoffFunction(numberOfAttempts, initialBackOff, maxBackoff, jitter, Infrastructure.getDefaultWorkerPool()); - if (predicate != null) { - whenStreamFactory = addPredicateToBackoffFactory(whenStreamFactory); - } - return Infrastructure.onMultiCreation( - new MultiRetryWhenOp<>(upstream, whenStreamFactory)); + new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory)); } else { - return Infrastructure.onMultiCreation(new MultiRetryOp<>(upstream, numberOfAttempts)); + return Infrastructure.onMultiCreation(new MultiRetryOp<>(upstream, onFailurePredicate, numberOfAttempts)); } } @@ -100,28 +97,8 @@ public Multi expireAt(long expireAt) { initialBackOff, maxBackoff, jitter, Infrastructure.getDefaultWorkerPool()); - if (predicate != null) { - whenStreamFactory = addPredicateToBackoffFactory(whenStreamFactory); - } - return Infrastructure.onMultiCreation( - new MultiRetryWhenOp<>(upstream, whenStreamFactory)); - } - - private Function, Publisher> addPredicateToBackoffFactory( - Function, Publisher> whenStreamFactory) { - return whenStreamFactory.compose(value -> value.onItem() - .transformToUni(throwable -> Uni.createFrom(). emitter(emitter -> { - try { - if (predicate.test(throwable)) { - emitter.complete(throwable); - } else { - emitter.fail(throwable); - } - } catch (Throwable err) { - emitter.fail(err); - } - })).concatenate()); + new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory)); } /** @@ -172,7 +149,8 @@ public Multi until(Predicate predicate) { } })) .concatenate(); - return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, whenStreamFactory)); + return Infrastructure + .onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory)); } /** @@ -195,7 +173,7 @@ public Multi when(Function, ? extends Publisher> whenStre } Function, ? extends Publisher> actual = Infrastructure .decorate(nonNull(whenStreamFactory, "whenStreamFactory")); - return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, actual)); + return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, actual)); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniRetry.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniRetry.java index 1bf0f2c20..d378e17fb 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniRetry.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniRetry.java @@ -21,7 +21,7 @@ public class UniRetry { private final Uni upstream; - private final Predicate predicate; + private final Predicate onFailurePredicate; private Duration initialBackOffDuration = Duration.ofSeconds(1); private Duration maxBackoffDuration = ExponentialBackoff.MAX_BACKOFF; @@ -29,9 +29,9 @@ public class UniRetry { private boolean backOffConfigured = false; - public UniRetry(Uni upstream, Predicate predicate) { + public UniRetry(Uni upstream, Predicate onFailurePredicate) { this.upstream = upstream; - this.predicate = predicate; + this.onFailurePredicate = onFailurePredicate; } /** @@ -59,17 +59,12 @@ public Uni indefinitely() { @CheckReturnValue public Uni atMost(long numberOfAttempts) { if (!backOffConfigured) { - return Infrastructure.onUniCreation(new UniRetryAtMost<>(upstream, predicate, numberOfAttempts)); + return Infrastructure.onUniCreation(new UniRetryAtMost<>(upstream, onFailurePredicate, numberOfAttempts)); } else { Function, Publisher> factory = ExponentialBackoff .randomExponentialBackoffFunction(numberOfAttempts, initialBackOffDuration, maxBackoffDuration, jitter, Infrastructure.getDefaultWorkerPool()); - - if (predicate != null) { - factory = addPredicateToBackoffFactory(factory); - } - - return upstream.toMulti().onFailure().retry().when(factory).toUni(); + return upstream.toMulti().onFailure(onFailurePredicate).retry().when(factory).toUni(); } } @@ -83,7 +78,6 @@ public Uni atMost(long numberOfAttempts) { * @param expireAt absolute time in millis that specifies when to give up * @return a new {@link Uni} retrying to subscribe to the current {@link Uni} until it gets an item or until * expiration {@code expireAt}. When the expiration is reached, the last failure is propagated. - * * @throws IllegalArgumentException if back off not configured, */ @CheckReturnValue @@ -95,28 +89,7 @@ public Uni expireAt(long expireAt) { Function, Publisher> factory = ExponentialBackoff .randomExponentialBackoffFunctionExpireAt(expireAt, initialBackOffDuration, maxBackoffDuration, jitter, Infrastructure.getDefaultWorkerPool()); - - if (predicate != null) { - factory = addPredicateToBackoffFactory(factory); - } - - return upstream.toMulti().onFailure().retry().when(factory).toUni(); - } - - private Function, Publisher> addPredicateToBackoffFactory( - Function, Publisher> factory) { - return factory.compose(value -> value.onItem() - .transformToUni(throwable -> Uni.createFrom(). emitter(emitter -> { - try { - if (predicate.test(throwable)) { - emitter.complete(throwable); - } else { - emitter.fail(throwable); - } - } catch (Throwable err) { - emitter.fail(err); - } - })).concatenate()); + return upstream.toMulti().onFailure(onFailurePredicate).retry().when(factory).toUni(); } /** @@ -129,7 +102,6 @@ private Function, Publisher> addPredicateToBackoffFactory * @param expireIn relative time in millis that specifies when to give up * @return a new {@link Uni} retrying to subscribe to the current {@link Uni} until it gets an item or until * expiration {@code expireIn}. When the expiration is reached, the last failure is propagated. - * * @throws IllegalArgumentException if back off not configured, */ @CheckReturnValue @@ -186,7 +158,7 @@ public Uni when(Function, ? extends Publisher> whenStream } Function, ? extends Publisher> actual = Infrastructure .decorate(nonNull(whenStreamFactory, "whenStreamFactory")); - return upstream.toMulti().onFailure().retry().when(actual).toUni(); + return upstream.toMulti().onFailure(this.onFailurePredicate).retry().when(actual).toUni(); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryOp.java index 8e92f1849..3b0d948b8 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryOp.java @@ -1,16 +1,18 @@ package io.smallrye.mutiny.operators.multi; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import org.reactivestreams.Publisher; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber; /** - * Multi operator re-subscribing to the upstream if if receives a failure event. + * Multi operator re-subscribing to the upstream if it receives a failure event. * It can re-subscribe indefinitely (passing Long.MAX_VALUE as number of attempts) or a fixed number of times. * * @param the type of item @@ -18,15 +20,17 @@ public final class MultiRetryOp extends AbstractMultiOperator { private final long times; + private final Predicate onFailurePredicate; - public MultiRetryOp(Multi upstream, long times) { + public MultiRetryOp(Multi upstream, Predicate onFailurePredicate, long times) { super(upstream); + this.onFailurePredicate = onFailurePredicate; this.times = times; } @Override public void subscribe(MultiSubscriber downstream) { - RetrySubscriber subscriber = new RetrySubscriber<>(upstream, downstream, times); + RetrySubscriber subscriber = new RetrySubscriber<>(upstream, onFailurePredicate, downstream, times); downstream.onSubscribe(subscriber); @@ -43,10 +47,14 @@ static final class RetrySubscriber extends SwitchableSubscriptionSubscriber upstream, MultiSubscriber downstream, long attempts) { + private final Predicate onFailurePredicate; + + RetrySubscriber(Publisher upstream, Predicate onFailurePredicate, + MultiSubscriber downstream, long attempts) { super(downstream); this.upstream = upstream; this.remaining = attempts; + this.onFailurePredicate = onFailurePredicate; } @Override @@ -57,6 +65,10 @@ public void onItem(T t) { @Override public void onFailure(Throwable t) { + if (testOnFailurePredicate(t)) { + return; + } + long r = remaining; if (r != Long.MAX_VALUE) { if (r == 0) { @@ -69,6 +81,22 @@ public void onFailure(Throwable t) { resubscribe(); } + private boolean testOnFailurePredicate(Throwable t) { + // The onFailurePredicate cannot be null. + try { + if (!onFailurePredicate.test(t)) { + cancel(); + downstream.onFailure(t); + return true; + } + } catch (Throwable e) { + cancel(); + downstream.onFailure(new CompositeException(e, t)); + return true; + } + return false; + } + void resubscribe() { if (wip.getAndIncrement() == 0) { do { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryWhenOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryWhenOp.java index 24ee9bd17..c78dc0293 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryWhenOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiRetryWhenOp.java @@ -2,12 +2,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Context; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; @@ -29,14 +31,16 @@ public final class MultiRetryWhenOp extends AbstractMultiOperator { private final Function, ? extends Publisher> triggerStreamFactory; + private final Predicate onFailurePredicate; - public MultiRetryWhenOp(Multi upstream, + public MultiRetryWhenOp(Multi upstream, Predicate onFailurePredicate, Function, ? extends Publisher> triggerStreamFactory) { super(upstream); + this.onFailurePredicate = onFailurePredicate; this.triggerStreamFactory = triggerStreamFactory; } - private static void subscribe(MultiSubscriber downstream, + private static void subscribe(MultiSubscriber downstream, Predicate onFailurePredicate, Function, ? extends Publisher> triggerStreamFactory, Multi upstream) { TriggerSubscriber other = new TriggerSubscriber(); @@ -44,7 +48,7 @@ private static void subscribe(MultiSubscriber downstream, signaller.onSubscribe(Subscriptions.empty()); MultiSubscriber serialized = new SerializedSubscriber<>(downstream); - RetryWhenOperator operator = new RetryWhenOperator<>(upstream, serialized, signaller); + RetryWhenOperator operator = new RetryWhenOperator<>(upstream, onFailurePredicate, serialized, signaller); other.operator = operator; serialized.onSubscribe(operator); @@ -69,7 +73,7 @@ private static void subscribe(MultiSubscriber downstream, @Override public void subscribe(MultiSubscriber downstream) { - subscribe(downstream, triggerStreamFactory, upstream); + subscribe(downstream, onFailurePredicate, triggerStreamFactory, upstream); } static final class RetryWhenOperator extends SwitchableSubscriptionSubscriber { @@ -78,12 +82,15 @@ static final class RetryWhenOperator extends SwitchableSubscriptionSubscriber private final AtomicInteger wip = new AtomicInteger(); private final Subscriber signaller; private final Subscriptions.DeferredSubscription arbiter = new Subscriptions.DeferredSubscription(); + private final Predicate onFailurePredicate; long produced; - RetryWhenOperator(Publisher upstream, MultiSubscriber downstream, + RetryWhenOperator(Publisher upstream, Predicate onFailurePredicate, + MultiSubscriber downstream, Subscriber signaller) { super(downstream); + this.onFailurePredicate = onFailurePredicate; this.upstream = upstream; this.signaller = signaller; } @@ -109,6 +116,10 @@ public void onItem(T t) { @Override public void onFailure(Throwable t) { + if (testOnFailurePredicate(t)) { + return; + } + long p = produced; if (p != 0L) { produced = 0; @@ -118,6 +129,20 @@ public void onFailure(Throwable t) { signaller.onNext(t); } + private boolean testOnFailurePredicate(Throwable t) { + try { + if (!onFailurePredicate.test(t)) { + arbiter.cancel(); + downstream.onFailure(t); + } + } catch (Throwable e) { + arbiter.cancel(); + downstream.onFailure(new CompositeException(e, t)); + return true; + } + return false; + } + @Override public void onCompletion() { arbiter.cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java index 1b101beea..a13b8e592 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Predicate; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.operators.AbstractUni; @@ -78,7 +79,7 @@ private boolean testPredicate(Throwable failure) { try { passes = uniRetryAtMost.predicate.test(failure); } catch (Throwable e) { - downstream.onFailure(e); + downstream.onFailure(new CompositeException(e, failure)); return false; } if (!passes) { diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/UniOnFailureRetryTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/UniOnFailureRetryTest.java index a33be67fb..fd0ae6cce 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/UniOnFailureRetryTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/UniOnFailureRetryTest.java @@ -18,8 +18,12 @@ import org.junit.jupiter.api.parallel.ResourceAccessMode; import org.junit.jupiter.api.parallel.ResourceLock; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import io.smallrye.mutiny.operators.MultiOnFailureRetryTest; +import io.smallrye.mutiny.unchecked.Unchecked; import junit5.support.InfrastructureResource; @ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ) @@ -271,4 +275,280 @@ public boolean test(Throwable throwable) { } } + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryAtMost() { + AtomicInteger count = new AtomicInteger(); + + Uni.createFrom().item(1) + .onItem().invoke(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyRuntimeException("boom"); + } + }) + .onFailure(MultiOnFailureRetryTest.MyRuntimeException.class).retry().atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertCompleted() + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry().atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryAtMostWithBackoff() { + AtomicInteger count = new AtomicInteger(); + + Uni.createFrom().item(1) + .onItem().invoke(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyRuntimeException("boom"); + } + }) + .onFailure(MultiOnFailureRetryTest.MyRuntimeException.class).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitItem().assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitItem() + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryAtMostWithExpireAt() { + AtomicInteger count = new AtomicInteger(); + + Uni.createFrom().item(1) + .onItem().invoke(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyRuntimeException("boom"); + } + }) + .onFailure(MultiOnFailureRetryTest.MyRuntimeException.class).retry() + .withBackOff(Duration.ofMillis(2)).expireAt(System.currentTimeMillis() + 10000) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitItem().assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry() + .withBackOff(Duration.ofMillis(2)).expireAt(System.currentTimeMillis() + 10000) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitItem() + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry() + .withBackOff(Duration.ofMillis(2)).expireAt(System.currentTimeMillis() + 10000) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry() + .withBackOff(Duration.ofMillis(2)).expireAt(System.currentTimeMillis() + 10000) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry() + .withBackOff(Duration.ofMillis(2)).expireAt(System.currentTimeMillis() + 10000) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryWhen() { + AtomicInteger count = new AtomicInteger(); + + Uni.createFrom().item(1) + .onItem().invoke(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyRuntimeException("boom"); + } + }) + .onFailure(MultiOnFailureRetryTest.MyRuntimeException.class).retry() + .when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertCompleted() + .assertItem(1); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry() + .when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Uni.createFrom().item(1) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() == 0) { + throw new MultiOnFailureRetryTest.MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .assertFailedWith(CompositeException.class, "expected"); + } + } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java index 043390393..e03bc20bd 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java @@ -13,9 +13,11 @@ import org.junit.jupiter.api.parallel.ResourceAccessMode; import org.junit.jupiter.api.parallel.ResourceLock; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.groups.MultiRetry; import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.unchecked.Unchecked; import junit5.support.InfrastructureResource; @ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ) @@ -168,4 +170,292 @@ public void testJitterValidation() { .onFailure().retry().withJitter(2)); } + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryAtMost() { + AtomicInteger count = new AtomicInteger(); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(i -> { + if (count.getAndIncrement() < 2) { + throw new MyRuntimeException("boom"); + } + }) + .onFailure(MyRuntimeException.class).retry().atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry().atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryAtMostAndBackOff() { + AtomicInteger count = new AtomicInteger(); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(i -> { + if (count.getAndIncrement() < 2) { + throw new MyRuntimeException("boom"); + } + }) + .onFailure(MyRuntimeException.class).retry().withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .awaitCompletion() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .awaitCompletion() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .awaitFailure() + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")).retry() + .withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .awaitFailure() + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().withBackOff(Duration.ofMillis(2)).atMost(2) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .awaitFailure() + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryWhen() { + AtomicInteger count = new AtomicInteger(); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(i -> { + if (count.getAndIncrement() < 2) { + throw new MyRuntimeException("boom"); + } + }) + .onFailure(MyRuntimeException.class).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")) + .retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().when(t -> Multi.createFrom().items(1, 1, 1, 1)) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(CompositeException.class, "expected"); + } + + @Test + public void checkThatItDoesOnlyRetryOnMatchingExceptionWithRetryUntil() { + AtomicInteger count = new AtomicInteger(); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(i -> { + if (count.getAndIncrement() < 2) { + throw new MyRuntimeException("boom"); + } + }) + .onFailure(MyRuntimeException.class).retry().until(t -> true) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().contains("boom")).retry().until(t -> true) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(ArithmeticException.class).retry().until(t -> true) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> t.getMessage().equalsIgnoreCase("wrong")) + .retry().until(t -> true) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(RuntimeException.class, "boom"); + + count.set(0); + + Multi.createFrom().items(1, 2, 3, 4) + .onItem().invoke(Unchecked.consumer(i -> { + if (count.getAndIncrement() < 2) { + throw new MyException("boom"); + } + })) + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().until(t -> true) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(CompositeException.class, "expected"); + } + + public static class MyException extends Exception { + + public MyException(String m) { + super(m); + } + } + + public static class MyRuntimeException extends RuntimeException { + + public MyRuntimeException(String m) { + super(m); + } + } + } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryUntilTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryUntilTest.java index 28e815f0d..d64b70ffb 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryUntilTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryUntilTest.java @@ -73,7 +73,7 @@ public void testTwoRetriesAndGiveUp() { .onFailure().retry().until(retryTwice) .subscribe().withSubscriber(subscriber); - subscriber.assertItems(0, 1, 0, 1, 0, 1).assertFailedWith(Exception.class, "boom"); + subscriber.assertFailedWith(Exception.class, "boom").assertItems(0, 1, 0, 1, 0, 1); } @Test diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java index 9111a6c4a..e95c7023e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java @@ -42,7 +42,7 @@ public void init() { @Test public void testThatUpstreamCannotBeNull() { - assertThrows(IllegalArgumentException.class, () -> new MultiRetryWhenOp<>(null, v -> v)); + assertThrows(IllegalArgumentException.class, () -> new MultiRetryWhenOp<>(null, null, v -> v)); } @Test diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryUntilTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryUntilTest.java index 582d4b589..231723359 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryUntilTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryUntilTest.java @@ -5,13 +5,16 @@ import java.io.IOException; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.junit.jupiter.api.Test; +import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.TestException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; public class UniOnFailureRetryUntilTest { @@ -173,4 +176,42 @@ public void testThatYouCannotUseUntilIfBackoffIsConfigured() { .onFailure().retry().withBackOff(Duration.ofSeconds(1)).until(t -> true)); } + /** + * Reproducer for https://github.com/smallrye/smallrye-mutiny/discussions/814. + */ + @Test + public void checkThatThePredicateIsUsedWithOnFailureUntil() { + AtomicBoolean first = new AtomicBoolean(); + AtomicBoolean second = new AtomicBoolean(); + UniAssertSubscriber subscriber = Uni.createFrom().failure(new MultiOnFailureRetryTest.MyException("")) + .replaceWithVoid() + .onFailure(e -> !(e instanceof MultiOnFailureRetryTest.MyException)).retry().until(x -> { + first.set(true); + return false; + }) + .onFailure(MultiOnFailureRetryTest.MyException.class).retry().until(x -> { + second.set(true); + return false; + }) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + subscriber.awaitFailure(t -> assertThat(t).isInstanceOf(MultiOnFailureRetryTest.MyException.class)); + assertThat(first).isFalse(); + assertThat(second).isTrue(); + } + + @Test + public void testRetryWhenOnFailurePredicateFails() { + AtomicInteger count = new AtomicInteger(); + UniAssertSubscriber subscriber = Uni.createFrom() + .failure(() -> new MultiOnFailureRetryTest.MyException("BOOM " + count.getAndIncrement())) + .replaceWithVoid() + .onFailure(t -> { + throw new RuntimeException("expected"); + }).retry().until(x -> false) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber.awaitFailure(t -> assertThat(t).isInstanceOf(CompositeException.class).hasMessageContaining("expected")); + assertThat(count).hasValue(1); + } + }