diff --git a/src/main/java/io/r2dbc/pool/ConnectionPool.java b/src/main/java/io/r2dbc/pool/ConnectionPool.java index c66a3eb..ef2b68f 100644 --- a/src/main/java/io/r2dbc/pool/ConnectionPool.java +++ b/src/main/java/io/r2dbc/pool/ConnectionPool.java @@ -28,7 +28,6 @@ import reactor.pool.PoolBuilder; import reactor.pool.PoolConfig; import reactor.pool.PoolMetricsRecorder; -import reactor.pool.PooledRef; import reactor.pool.PooledRefMetadata; import reactor.util.Logger; import reactor.util.Loggers; @@ -45,8 +44,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Function; @@ -80,6 +77,7 @@ public class ConnectionPool implements ConnectionFactory, Disposable, Closeable, * @param configuration the configuration to use for building the connection pool. * @throws IllegalArgumentException if {@code configuration} is {@code null} */ + @SuppressWarnings("unchecked") public ConnectionPool(ConnectionPoolConfiguration configuration) { this.connectionPool = createConnectionPool(Assert.requireNonNull(configuration, "ConnectionPoolConfiguration must not be null")); this.factory = configuration.getConnectionFactory(); @@ -99,35 +97,22 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { Mono<Connection> create = Mono.defer(() -> { - AtomicReference<PooledRef<Connection>> emitted = new AtomicReference<>(); - AtomicBoolean cancelled = new AtomicBoolean(); - Mono<Connection> mono = this.connectionPool.acquire() - .doOnNext(emitted::set) - .doOnSubscribe(subscription -> { + .flatMap(ref -> { if (logger.isDebugEnabled()) { logger.debug("Obtaining new connection from the pool"); } - }) - .flatMap(ref -> { - Mono<Connection> conn = getValidConnection(allocateValidation, ref); + PooledConnection connection = new PooledConnection(ref); + Mono<Connection> conn = getValidConnection(allocateValidation, connection).onErrorResume(throwable -> ref.invalidate().then(Mono.error(throwable))); - return conn.onErrorResume(throwable -> { - emitted.set(null); // prevent release on cancel - return ref.invalidate().then(Mono.error(throwable)); - }) - .doFinally(s -> cleanup(cancelled, emitted)); + return Operators.discardOnCancel(conn, () -> { + ref.release().subscribe(); + return false; + }); }) - .as(self -> Operators.discardOnCancel(self, () -> { - - // cancel upstream to interrupt connection allocation. - cancelled.set(true); - return emitted.get() == null; - })) - .name(acqName) - .doOnNext(it -> emitted.set(null)); + .name(acqName); if (!this.maxAcquireTime.isZero()) { mono = mono.timeout(this.maxAcquireTime).onErrorMap(TimeoutException.class, e -> new R2dbcTimeoutException(timeoutMessage, e)); @@ -135,23 +120,10 @@ public ConnectionPool(ConnectionPoolConfiguration configuration) { return mono; }); this.create = configuration.getAcquireRetry() > 0 ? create.retry(configuration.getAcquireRetry()) : create; - } - - static void cleanup(AtomicBoolean cancelled, AtomicReference<PooledRef<Connection>> emitted) { - - if (cancelled.compareAndSet(true, false)) { - PooledRef<Connection> savedRef = emitted.get(); - if (savedRef != null && emitted.compareAndSet(savedRef, null)) { - logger.debug("Releasing connection after cancellation"); - savedRef.release().subscribe(ignore -> { - }, e -> logger.warn("Error during release", e)); - } - } } - private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, PooledRef<Connection> ref) { - PooledConnection connection = new PooledConnection(ref); + private Mono<Connection> getValidConnection(Function<Connection, Mono<Void>> allocateValidation, Connection connection) { return allocateValidation.apply(connection).thenReturn(connection); } diff --git a/src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java b/src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java index b4ebbbf..916b5b1 100644 --- a/src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java +++ b/src/main/java/io/r2dbc/pool/MonoDiscardOnCancel.java @@ -115,8 +115,8 @@ public void request(long n) { public void cancel() { if (compareAndSet(false, true)) { - if (logger.isDebugEnabled()) { - logger.debug("received cancel signal"); + if (logger.isTraceEnabled()) { + logger.trace("received cancel signal"); } try { boolean mayCancelUpstream = this.cancelConsumer.getAsBoolean(); diff --git a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java index e13dbaf..5cd718d 100644 --- a/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java +++ b/src/test/java/io/r2dbc/pool/ConnectionPoolUnitTests.java @@ -843,7 +843,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio CountDownLatch validateLatch = new CountDownLatch(1); CountDownLatch postLatch = new CountDownLatch(1); AtomicBoolean seenCancel = new AtomicBoolean(); - Mono<Boolean> validate = Mono.just(true).doOnSuccess(s -> validateLatch.countDown()).delayElement(Duration.ofSeconds(1)).doOnSuccess(s -> postLatch.countDown()).doOnCancel(() -> { + Mono<Boolean> validate = Mono.just(true).doOnSuccess(s -> validateLatch.countDown()).delayElement(Duration.ofMillis(250)).doOnSuccess(s -> postLatch.countDown()).doOnCancel(() -> { seenCancel.set(true); }); @@ -860,7 +860,7 @@ void cancelDuringAllocationShouldCompleteAtomically() throws InterruptedExceptio postLatch.await(); PoolMetrics poolMetrics = pool.getMetrics().get(); - await().atMost(Duration.ofSeconds(1)).until(() -> poolMetrics.idleSize() == 10); + await().atMost(Duration.ofSeconds(2)).until(() -> poolMetrics.idleSize() == 10); assertThat(seenCancel).isFalse(); assertThat(poolMetrics.pendingAcquireSize()).isEqualTo(0);