Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Propagate subscriber context. Allow conditional cancellation propagation.

[#140]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Oct 28, 2021
1 parent d9363cd commit ddbc65b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
10 changes: 7 additions & 3 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
emitted.set(null); // prevent release on cancel
return ref.invalidate().then(Mono.error(throwable));
})
.doFinally(s -> cleanup(cancelled, emitted))
.as(self -> Operators.discardOnCancel(self, () -> cancelled.set(true)));
.doFinally(s -> cleanup(cancelled, emitted));
})
.as(self -> Operators.discardOnCancel(self, () -> cancelled.set(true)))
.as(self -> Operators.discardOnCancel(self, () -> {

// cancel upstream to interrupt connection allocation.
cancelled.set(true);
return emitted.get() == null;
}))
.name(acqName)
.doOnNext(it -> emitted.set(null));

Expand Down
20 changes: 15 additions & 5 deletions src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.util.context.Context;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

/**
* A decorating operator that replays signals from its source to a {@link Subscriber} and drains the source upon {@link Subscription#cancel() cancel} and drops data signals until termination.
Expand All @@ -37,9 +38,9 @@ class MonoDiscardOnCancel<T> extends MonoOperator<T, T> {

private static final Logger logger = Loggers.getLogger(MonoDiscardOnCancel.class);

private final Runnable cancelConsumer;
private final BooleanSupplier cancelConsumer;

MonoDiscardOnCancel(Mono<? extends T> source, Runnable cancelConsumer) {
MonoDiscardOnCancel(Mono<? extends T> source, BooleanSupplier cancelConsumer) {
super(source);
this.cancelConsumer = cancelConsumer;
}
Expand All @@ -55,17 +56,22 @@ static class MonoDiscardOnCancelSubscriber<T> extends AtomicBoolean implements C

final Context ctx;

final Runnable cancelConsumer;
final BooleanSupplier cancelConsumer;

Subscription s;

MonoDiscardOnCancelSubscriber(CoreSubscriber<T> actual, Runnable cancelConsumer) {
MonoDiscardOnCancelSubscriber(CoreSubscriber<T> actual, BooleanSupplier cancelConsumer) {

this.actual = actual;
this.ctx = actual.currentContext();
this.cancelConsumer = cancelConsumer;
}

@Override
public Context currentContext() {
return this.ctx;
}

@Override
public void onSubscribe(Subscription s) {

Expand Down Expand Up @@ -113,7 +119,11 @@ public void cancel() {
logger.debug("received cancel signal");
}
try {
this.cancelConsumer.run();
boolean mayCancelUpstream = this.cancelConsumer.getAsBoolean();
if (mayCancelUpstream) {
this.s.cancel();
return;
}
} catch (Exception e) {
Operators.onErrorDropped(e, this.ctx);
}
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/r2dbc/pool/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;

import java.util.function.BooleanSupplier;

/**
* Operator utility.
*
Expand All @@ -40,8 +42,7 @@ private Operators() {
* @return decorated {@link Mono}.
*/
public static <T> Mono<T> discardOnCancel(Mono<? extends T> source) {
return new MonoDiscardOnCancel<>(source, () -> {
});
return new MonoDiscardOnCancel<>(source, () -> false);
}

/**
Expand All @@ -51,13 +52,14 @@ public static <T> Mono<T> discardOnCancel(Mono<? extends T> source) {
* previous frames on the stack.
* <p>Propagate the {@link Subscription#cancel()} signal to a {@link Runnable consumer}.
*
* @param source the source to decorate.
* @param cancelConsumer {@link Runnable} notified when the resulting {@link Mono} receives a {@link Subscription#cancel() cancel} signal.
* @param <T> The type of values in both source and output sequences.
* @param source the source to decorate.
* @param onCancel {@link BooleanSupplier} notified when the resulting {@link Mono} receives a {@link Subscription#cancel() cancel} signal. Expects the callback to return a boolean flag whether
* to propagate the cancellation signal upstream via {@code true} or {@code false} to drain the items and suppress upstream cancellation.
* @param <T> The type of values in both source and output sequences.
* @return decorated {@link Mono}.
*/
public static <T> Mono<T> discardOnCancel(Mono<? extends T> source, Runnable cancelConsumer) {
return new MonoDiscardOnCancel<>(source, cancelConsumer);
public static <T> Mono<T> discardOnCancel(Mono<? extends T> source, BooleanSupplier onCancel) {
return new MonoDiscardOnCancel<>(source, onCancel);
}

}

0 comments on commit ddbc65b

Please sign in to comment.