Skip to content

Commit

Permalink
Ensure resource cleanup during cancellation of the validation query.
Browse files Browse the repository at this point in the history
[#144]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Nov 10, 2021
1 parent 3c21501 commit a3611e2
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 45 deletions.
52 changes: 13 additions & 39 deletions src/main/java/io/r2dbc/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import reactor.pool.PoolBuilder;
import reactor.pool.PoolConfig;
import reactor.pool.PoolMetricsRecorder;
import reactor.pool.PooledRef;
import reactor.pool.PooledRefMetadata;
import reactor.util.Logger;
import reactor.util.Loggers;
Expand All @@ -48,8 +47,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -86,6 +83,7 @@ public class ConnectionPool implements ConnectionFactory, Disposable, Closeable,
* @param configuration the configuration to use for building the connection pool.
* @throws IllegalArgumentException if {@code configuration} is {@code null}
*/
@SuppressWarnings("unchecked")
public ConnectionPool(ConnectionPoolConfiguration configuration) {
this.connectionPool = createConnectionPool(Assert.requireNonNull(configuration, "ConnectionPoolConfiguration must not be null"));
this.factory = configuration.getConnectionFactory();
Expand All @@ -106,18 +104,12 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {

Mono<Connection> create = Mono.defer(() -> {

AtomicReference<PooledRef<Connection>> emitted = new AtomicReference<>();
AtomicBoolean cancelled = new AtomicBoolean();

Mono<Connection> mono = this.connectionPool.acquire()
.doOnNext(emitted::set)
.doOnSubscribe(subscription -> {
.flatMap(ref -> {

if (logger.isDebugEnabled()) {
logger.debug("Obtaining new connection from the pool");
}
})
.flatMap(ref -> {

Mono<Void> prepare = null;
if (ref.poolable() instanceof Lifecycle) {
Expand All @@ -130,51 +122,33 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) {
prepare = prepare == null ? postAllocate : prepare.then(postAllocate);
}

PooledConnection connection = new PooledConnection(ref, this.preRelease);
Mono<Connection> conn;
if (prepare == null) {
conn = getValidConnection(allocateValidation, ref);
conn = getValidConnection(allocateValidation, connection);
} else {
conn = prepare.then(getValidConnection(allocateValidation, ref));
conn = prepare.then(getValidConnection(allocateValidation, connection));
}

return conn.onErrorResume(throwable -> {
emitted.set(null); // prevent release on cancel
return ref.invalidate().then(Mono.error(throwable));
})
.doFinally(s -> cleanup(cancelled, emitted));
})
.as(self -> Operators.discardOnCancel(self, () -> {
conn = conn.onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable)));

// cancel upstream to interrupt connection allocation.
cancelled.set(true);
return emitted.get() == null;
}))
.name(acqName)
.doOnNext(it -> emitted.set(null));
return Operators.discardOnCancel(conn, () -> {
ref.release().subscribe();
return false;
});
})
.name(acqName);

if (!this.maxAcquireTime.isNegative()) {
mono = mono.timeout(this.maxAcquireTime).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e));
}
return mono;
});
this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create;
}

static void cleanup(AtomicBoolean cancelled, AtomicReference<PooledRef<Connection>> emitted) {

if (cancelled.compareAndSet(true, false)) {

PooledRef<Connection> savedRef = emitted.get();
if (savedRef != null && emitted.compareAndSet(savedRef, null)) {
logger.debug("Releasing connection after cancellation");
savedRef.release().subscribe(ignore -> {
}, e -> logger.warn("Error during release", e));
}
}
}

private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, PooledRef<Connection> ref) {
PooledConnection connection = new PooledConnection(ref, this.preRelease);
private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, Connection connection) {
return allocateValidation.apply(connection).thenReturn(connection);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public void request(long n) {
public void cancel() {

if (compareAndSet(false, true)) {
if (logger.isDebugEnabled()) {
logger.debug("received cancel signal");
if (logger.isTraceEnabled()) {
logger.trace("received cancel signal");
}
try {
boolean mayCancelUpstream = this.cancelConsumer.getAsBoolean();
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void shouldTimeoutCreateConnection() {
ConnectionFactory connectionFactoryMock = mock(ConnectionFactory.class);
Connection connectionMock = mock(Connection.class);
when(connectionFactoryMock.create()).thenReturn((Publisher) Mono.defer(() ->
Mono.delay(Duration.ofSeconds(15)).thenReturn(connectionMock))
Mono.delay(Duration.ofSeconds(5)).thenReturn(connectionMock))
);

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock)
Expand All @@ -239,7 +239,7 @@ void shouldTimeoutCreateConnection() {

StepVerifier.withVirtualTime(pool::create)
.expectSubscription()
.thenAwait(Duration.ofSeconds(11))
.thenAwait(Duration.ofSeconds(2))
.expectError(TimeoutException.class)
.verify();

Expand Down Expand Up @@ -942,7 +942,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio
Mono<Void> prepare = Mono.<Void>empty().delayElement(Duration.ofMillis(100)).doOnSuccess(s -> prepareLatch.countDown()).doOnCancel(() -> {
seenCancel.set(true);
});
Mono<Boolean> validate = Mono.just(true).delayElement(Duration.ofSeconds(1)).doOnSuccess(s -> validateLatch.countDown()).doOnCancel(() -> {
Mono<Boolean> validate = Mono.just(true).delayElement(Duration.ofMillis(250)).doOnSuccess(s -> validateLatch.countDown()).doOnCancel(() -> {
seenCancel.set(true);
});

Expand All @@ -960,7 +960,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio
validateLatch.await();

PoolMetrics poolMetrics = pool.getMetrics().get();
await().atMost(Duration.ofSeconds(1)).until(() -> poolMetrics.idleSize() == 10);
await().atMost(Duration.ofSeconds(2)).until(() -> poolMetrics.idleSize() == 10);

assertThat(seenCancel).isFalse();
assertThat(poolMetrics.pendingAcquireSize()).isEqualTo(0);
Expand Down

0 comments on commit a3611e2

Please sign in to comment.