Skip to content

Commit

Permalink
MultiOnRetry was ignoring the onFailure predicate.
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Jan 19, 2022
1 parent 6e0581d commit d963c73
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 80 deletions.
10 changes: 9 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@
"criticality": "highlight",
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"differences": []
"differences": [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.groups.MultiRetry<T>::<init>(io.smallrye.mutiny.Multi<T>, java.util.function.Predicate<? super java.lang.Throwable>)",
"new": "method void io.smallrye.mutiny.groups.MultiRetry<T>::<init>(io.smallrye.mutiny.Multi<T>, java.util.function.Predicate<? super java.lang.Throwable>, java.util.function.Predicate<? super java.lang.Throwable>)",
"justification": "MultiRetry is an operator class (not user facing). The change adds support for the onFailure predicate"
}
]
}
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
public class MultiRetry<T> {

private final Multi<T> upstream;
private final Predicate<? super Throwable> predicate;
private final Predicate<? super Throwable> onFailurePredicate;
private Duration initialBackOff = Duration.ofSeconds(1);
private Duration maxBackoff = ExponentialBackoff.MAX_BACKOFF;
private double jitter = ExponentialBackoff.DEFAULT_JITTER;
private boolean backOffConfigured = false;

public MultiRetry(Multi<T> upstream, Predicate<? super Throwable> predicate) {
public MultiRetry(Multi<T> upstream,
Predicate<? super Throwable> onFailurePredicate) {
this.upstream = nonNull(upstream, "upstream");
this.predicate = predicate;
this.onFailurePredicate = nonNull(onFailurePredicate, "onFailurePredicate");
}

/**
Expand Down Expand Up @@ -63,14 +64,10 @@ public Multi<T> atMost(long numberOfAttempts) {
.randomExponentialBackoffFunction(numberOfAttempts, initialBackOff, maxBackoff, jitter,
Infrastructure.getDefaultWorkerPool());

if (predicate != null) {
whenStreamFactory = addPredicateToBackoffFactory(whenStreamFactory);
}

return Infrastructure.onMultiCreation(
new MultiRetryWhenOp<>(upstream, whenStreamFactory));
new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
} else {
return Infrastructure.onMultiCreation(new MultiRetryOp<>(upstream, numberOfAttempts));
return Infrastructure.onMultiCreation(new MultiRetryOp<>(upstream, onFailurePredicate, numberOfAttempts));
}

}
Expand Down Expand Up @@ -100,28 +97,8 @@ public Multi<T> expireAt(long expireAt) {
initialBackOff, maxBackoff, jitter,
Infrastructure.getDefaultWorkerPool());

if (predicate != null) {
whenStreamFactory = addPredicateToBackoffFactory(whenStreamFactory);
}

return Infrastructure.onMultiCreation(
new MultiRetryWhenOp<>(upstream, whenStreamFactory));
}

private Function<Multi<Throwable>, Publisher<Long>> addPredicateToBackoffFactory(
Function<Multi<Throwable>, Publisher<Long>> whenStreamFactory) {
return whenStreamFactory.compose(value -> value.onItem()
.transformToUni(throwable -> Uni.createFrom().<Throwable> emitter(emitter -> {
try {
if (predicate.test(throwable)) {
emitter.complete(throwable);
} else {
emitter.fail(throwable);
}
} catch (Throwable err) {
emitter.fail(err);
}
})).concatenate());
new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
}

/**
Expand Down Expand Up @@ -172,7 +149,8 @@ public Multi<T> until(Predicate<? super Throwable> predicate) {
}
}))
.concatenate();
return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, whenStreamFactory));
return Infrastructure
.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, whenStreamFactory));
}

/**
Expand All @@ -195,7 +173,7 @@ public Multi<T> when(Function<Multi<Throwable>, ? extends Publisher<?>> whenStre
}
Function<Multi<Throwable>, ? extends Publisher<?>> actual = Infrastructure
.decorate(nonNull(whenStreamFactory, "whenStreamFactory"));
return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, actual));
return Infrastructure.onMultiCreation(new MultiRetryWhenOp<>(upstream, onFailurePredicate, actual));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
public class UniRetry<T> {

private final Uni<T> upstream;
private final Predicate<? super Throwable> predicate;
private final Predicate<? super Throwable> onFailurePredicate;

private Duration initialBackOffDuration = Duration.ofSeconds(1);
private Duration maxBackoffDuration = ExponentialBackoff.MAX_BACKOFF;
private double jitter = ExponentialBackoff.DEFAULT_JITTER;

private boolean backOffConfigured = false;

public UniRetry(Uni<T> upstream, Predicate<? super Throwable> predicate) {
public UniRetry(Uni<T> upstream, Predicate<? super Throwable> onFailurePredicate) {
this.upstream = upstream;
this.predicate = predicate;
this.onFailurePredicate = onFailurePredicate;
}

/**
Expand Down Expand Up @@ -59,17 +59,12 @@ public Uni<T> indefinitely() {
@CheckReturnValue
public Uni<T> atMost(long numberOfAttempts) {
if (!backOffConfigured) {
return Infrastructure.onUniCreation(new UniRetryAtMost<>(upstream, predicate, numberOfAttempts));
return Infrastructure.onUniCreation(new UniRetryAtMost<>(upstream, onFailurePredicate, numberOfAttempts));
} else {
Function<Multi<Throwable>, Publisher<Long>> factory = ExponentialBackoff
.randomExponentialBackoffFunction(numberOfAttempts,
initialBackOffDuration, maxBackoffDuration, jitter, Infrastructure.getDefaultWorkerPool());

if (predicate != null) {
factory = addPredicateToBackoffFactory(factory);
}

return upstream.toMulti().onFailure().retry().when(factory).toUni();
return upstream.toMulti().onFailure(onFailurePredicate).retry().when(factory).toUni();
}
}

Expand All @@ -83,7 +78,6 @@ public Uni<T> atMost(long numberOfAttempts) {
* @param expireAt absolute time in millis that specifies when to give up
* @return a new {@link Uni} retrying to subscribe to the current {@link Uni} until it gets an item or until
* expiration {@code expireAt}. When the expiration is reached, the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand All @@ -95,28 +89,7 @@ public Uni<T> expireAt(long expireAt) {
Function<Multi<Throwable>, Publisher<Long>> factory = ExponentialBackoff
.randomExponentialBackoffFunctionExpireAt(expireAt,
initialBackOffDuration, maxBackoffDuration, jitter, Infrastructure.getDefaultWorkerPool());

if (predicate != null) {
factory = addPredicateToBackoffFactory(factory);
}

return upstream.toMulti().onFailure().retry().when(factory).toUni();
}

private Function<Multi<Throwable>, Publisher<Long>> addPredicateToBackoffFactory(
Function<Multi<Throwable>, Publisher<Long>> factory) {
return factory.compose(value -> value.onItem()
.transformToUni(throwable -> Uni.createFrom().<Throwable> emitter(emitter -> {
try {
if (predicate.test(throwable)) {
emitter.complete(throwable);
} else {
emitter.fail(throwable);
}
} catch (Throwable err) {
emitter.fail(err);
}
})).concatenate());
return upstream.toMulti().onFailure(onFailurePredicate).retry().when(factory).toUni();
}

/**
Expand All @@ -129,7 +102,6 @@ private Function<Multi<Throwable>, Publisher<Long>> addPredicateToBackoffFactory
* @param expireIn relative time in millis that specifies when to give up
* @return a new {@link Uni} retrying to subscribe to the current {@link Uni} until it gets an item or until
* expiration {@code expireIn}. When the expiration is reached, the last failure is propagated.
*
* @throws IllegalArgumentException if back off not configured,
*/
@CheckReturnValue
Expand Down Expand Up @@ -186,7 +158,7 @@ public Uni<T> when(Function<Multi<Throwable>, ? extends Publisher<?>> whenStream
}
Function<Multi<Throwable>, ? extends Publisher<?>> actual = Infrastructure
.decorate(nonNull(whenStreamFactory, "whenStreamFactory"));
return upstream.toMulti().onFailure().retry().when(actual).toUni();
return upstream.toMulti().onFailure(this.onFailurePredicate).retry().when(actual).toUni();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
package io.smallrye.mutiny.operators.multi;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.reactivestreams.Publisher;

import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;

/**
* Multi operator re-subscribing to the upstream if if receives a failure event.
* Multi operator re-subscribing to the upstream if it receives a failure event.
* It can re-subscribe indefinitely (passing Long.MAX_VALUE as number of attempts) or a fixed number of times.
*
* @param <T> the type of item
*/
public final class MultiRetryOp<T> extends AbstractMultiOperator<T, T> {

private final long times;
private final Predicate<? super Throwable> onFailurePredicate;

public MultiRetryOp(Multi<? extends T> upstream, long times) {
public MultiRetryOp(Multi<? extends T> upstream, Predicate<? super Throwable> onFailurePredicate, long times) {
super(upstream);
this.onFailurePredicate = onFailurePredicate;
this.times = times;
}

@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
RetrySubscriber<T> subscriber = new RetrySubscriber<>(upstream, downstream, times);
RetrySubscriber<T> subscriber = new RetrySubscriber<>(upstream, onFailurePredicate, downstream, times);

downstream.onSubscribe(subscriber);

Expand All @@ -43,10 +47,14 @@ static final class RetrySubscriber<T> extends SwitchableSubscriptionSubscriber<T
private long remaining;
long produced;

RetrySubscriber(Publisher<? extends T> upstream, MultiSubscriber<? super T> downstream, long attempts) {
private final Predicate<? super Throwable> onFailurePredicate;

RetrySubscriber(Publisher<? extends T> upstream, Predicate<? super Throwable> onFailurePredicate,
MultiSubscriber<? super T> downstream, long attempts) {
super(downstream);
this.upstream = upstream;
this.remaining = attempts;
this.onFailurePredicate = onFailurePredicate;
}

@Override
Expand All @@ -57,6 +65,10 @@ public void onItem(T t) {

@Override
public void onFailure(Throwable t) {
if (testOnFailurePredicate(t)) {
return;
}

long r = remaining;
if (r != Long.MAX_VALUE) {
if (r == 0) {
Expand All @@ -69,6 +81,22 @@ public void onFailure(Throwable t) {
resubscribe();
}

private boolean testOnFailurePredicate(Throwable t) {
// The onFailurePredicate cannot be null.
try {
if (!onFailurePredicate.test(t)) {
cancel();
downstream.onFailure(t);
return true;
}
} catch (Throwable e) {
cancel();
downstream.onFailure(new CompositeException(e, t));
return true;
}
return false;
}

void resubscribe() {
if (wip.getAndIncrement() == 0) {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
Expand All @@ -29,22 +31,24 @@
public final class MultiRetryWhenOp<T> extends AbstractMultiOperator<T, T> {

private final Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory;
private final Predicate<? super Throwable> onFailurePredicate;

public MultiRetryWhenOp(Multi<? extends T> upstream,
public MultiRetryWhenOp(Multi<? extends T> upstream, Predicate<? super Throwable> onFailurePredicate,
Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory) {
super(upstream);
this.onFailurePredicate = onFailurePredicate;
this.triggerStreamFactory = triggerStreamFactory;
}

private static <T> void subscribe(MultiSubscriber<? super T> downstream,
private static <T> void subscribe(MultiSubscriber<? super T> downstream, Predicate<? super Throwable> onFailurePredicate,
Function<? super Multi<Throwable>, ? extends Publisher<?>> triggerStreamFactory,
Multi<? extends T> upstream) {
TriggerSubscriber other = new TriggerSubscriber();
Subscriber<Throwable> signaller = new SerializedSubscriber<>(other.processor);
signaller.onSubscribe(Subscriptions.empty());
MultiSubscriber<T> serialized = new SerializedSubscriber<>(downstream);

RetryWhenOperator<T> operator = new RetryWhenOperator<>(upstream, serialized, signaller);
RetryWhenOperator<T> operator = new RetryWhenOperator<>(upstream, onFailurePredicate, serialized, signaller);
other.operator = operator;

serialized.onSubscribe(operator);
Expand All @@ -69,7 +73,7 @@ private static <T> void subscribe(MultiSubscriber<? super T> downstream,

@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
subscribe(downstream, triggerStreamFactory, upstream);
subscribe(downstream, onFailurePredicate, triggerStreamFactory, upstream);
}

static final class RetryWhenOperator<T> extends SwitchableSubscriptionSubscriber<T> {
Expand All @@ -78,12 +82,15 @@ static final class RetryWhenOperator<T> extends SwitchableSubscriptionSubscriber
private final AtomicInteger wip = new AtomicInteger();
private final Subscriber<Throwable> signaller;
private final Subscriptions.DeferredSubscription arbiter = new Subscriptions.DeferredSubscription();
private final Predicate<? super Throwable> onFailurePredicate;

long produced;

RetryWhenOperator(Publisher<? extends T> upstream, MultiSubscriber<? super T> downstream,
RetryWhenOperator(Publisher<? extends T> upstream, Predicate<? super Throwable> onFailurePredicate,
MultiSubscriber<? super T> downstream,
Subscriber<Throwable> signaller) {
super(downstream);
this.onFailurePredicate = onFailurePredicate;
this.upstream = upstream;
this.signaller = signaller;
}
Expand All @@ -109,6 +116,10 @@ public void onItem(T t) {

@Override
public void onFailure(Throwable t) {
if (testOnFailurePredicate(t)) {
return;
}

long p = produced;
if (p != 0L) {
produced = 0;
Expand All @@ -118,6 +129,20 @@ public void onFailure(Throwable t) {
signaller.onNext(t);
}

private boolean testOnFailurePredicate(Throwable t) {
try {
if (!onFailurePredicate.test(t)) {
arbiter.cancel();
downstream.onFailure(t);
}
} catch (Throwable e) {
arbiter.cancel();
downstream.onFailure(new CompositeException(e, t));
return true;
}
return false;
}

@Override
public void onCompletion() {
arbiter.cancel();
Expand Down
Loading

0 comments on commit d963c73

Please sign in to comment.