Skip to content

Commit

Permalink
Ensure resource cleanup during cancellation of the validation query.
Browse files Browse the repository at this point in the history
[#144]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Nov 10, 2021
1 parent ddbc65b commit a951b79
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 42 deletions.
48 changes: 10 additions & 38 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PoolMetricsRecorder;
import reactor.pool.PooledRef;
import reactor.pool.PooledRefMetadata;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand All @@ -45,8 +44,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -80,6 +77,7 @@ public class ConnectionPool implements ConnectionFactory, Disposable, Closeable,
* @param configuration the configuration to use for building the connection pool.
* @throws IllegalArgumentException if {@code configuration} is {@code null}
*/
@SuppressWarnings("unchecked")
public ConnectionPool(ConnectionPoolConfiguration configuration) {
this.connectionPool = createConnectionPool(Assert.requireNonNull(configuration, "ConnectionPoolConfiguration must not be null"));
this.factory = configuration.getConnectionFactory();
Expand All @@ -99,59 +97,33 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {

Mono<Connection> create = Mono.defer(() -> {

AtomicReference<PooledRef<Connection>> emitted = new AtomicReference<>();
AtomicBoolean cancelled = new AtomicBoolean();

Mono<Connection> mono = this.connectionPool.acquire()
.doOnNext(emitted::set)
.doOnSubscribe(subscription -> {
.flatMap(ref -> {

if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}
})
.flatMap(ref -> {

Mono<Connection> conn = getValidConnection(allocateValidation, ref);
PooledConnection connection = new PooledConnection(ref);
Mono<Connection> conn = getValidConnection(allocateValidation, connection).onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));

return conn.onErrorResume(throwable -> {
emitted.set(null); // prevent release on cancel
return ref.invalidate().then(Mono.error(throwable));
})
.doFinally(s -> cleanup(cancelled, emitted));
return Operators.discardOnCancel(conn, () -> {
ref.release().subscribe();
return false;
});
})
.as(self -> Operators.discardOnCancel(self, () -> {

// cancel upstream to interrupt connection allocation.
cancelled.set(true);
return emitted.get() == null;
}))
.name(acqName)
.doOnNext(it -> emitted.set(null));
.name(acqName);

if (!this.maxAcquireTime.isZero()) {
mono = mono.timeout(this.maxAcquireTime).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e));
}
return mono;
});
this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create;
}

static void cleanup(AtomicBoolean cancelled, AtomicReference<PooledRef<Connection>> emitted) {

if (cancelled.compareAndSet(true, false)) {

PooledRef<Connection> savedRef = emitted.get();
if (savedRef != null && emitted.compareAndSet(savedRef, null)) {
logger.debug("Releasing connection after cancellation");
savedRef.release().subscribe(ignore -> {
}, e -> logger.warn("Error during release", e));
}
}
}

private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, PooledRef<Connection> ref) {
PooledConnection connection = new PooledConnection(ref);
private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, Connection connection) {
return allocateValidation.apply(connection).thenReturn(connection);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void request(long n) {
public void cancel() {

if (compareAndSet(false, true)) {
if (logger.isDebugEnabled()) {
logger.debug("received cancel signal");
if (logger.isTraceEnabled()) {
logger.trace("received cancel signal");
}
try {
boolean mayCancelUpstream = this.cancelConsumer.getAsBoolean();
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio
CountDownLatch validateLatch = new CountDownLatch(1);
CountDownLatch postLatch = new CountDownLatch(1);
AtomicBoolean seenCancel = new AtomicBoolean();
Mono<Boolean> validate = Mono.just(true).doOnSuccess(s -> validateLatch.countDown()).delayElement(Duration.ofSeconds(1)).doOnSuccess(s -> postLatch.countDown()).doOnCancel(() -> {
Mono<Boolean> validate = Mono.just(true).doOnSuccess(s -> validateLatch.countDown()).delayElement(Duration.ofMillis(250)).doOnSuccess(s -> postLatch.countDown()).doOnCancel(() -> {
seenCancel.set(true);
});

Expand All @@ -860,7 +860,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio
postLatch.await();

PoolMetrics poolMetrics = pool.getMetrics().get();
await().atMost(Duration.ofSeconds(1)).until(() -> poolMetrics.idleSize() == 10);
await().atMost(Duration.ofSeconds(2)).until(() -> poolMetrics.idleSize() == 10);

assertThat(seenCancel).isFalse();
assertThat(poolMetrics.pendingAcquireSize()).isEqualTo(0);
Expand Down

0 comments on commit a951b79

Please sign in to comment.