Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring r2dbc-psql throws: Failed to obtain R2DBC Connection #1608

Closed
GeorgePap-719 opened this issue Sep 13, 2023 · 9 comments
Closed

spring r2dbc-psql throws: Failed to obtain R2DBC Connection #1608

GeorgePap-719 opened this issue Sep 13, 2023 · 9 comments
Labels
status: invalid An issue that we don't feel is valid

Comments

@GeorgePap-719
Copy link
Contributor

I have a simple project where i do some concurrent inserts:

    List<Mono<EventEntity>> monos = new ArrayList<>();
    for (int i = 0; i < 99; i++) {
      Mono<EventEntity> saved = eventRepository.save(event);
      monos.add(saved);
    }
    AtomicInteger i = new AtomicInteger();
    i.set(0);
    Flux
        .fromIterable(monos)
        .flatMap(eventEntityMono -> eventEntityMono.then().thenReturn(i.getAndIncrement())) // concurrent op
        .doOnEach(System.out::println)
        .doOnEach(it -> System.out.println(Thread.currentThread().getName()))
        .then()
        .block();

application properties:

spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.max-size=10

I was under the impression that if i enable the pooling feature, then the connections should never exceed 10. Is this a bug or am i understanding something incorecctly ?

Logs:

org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection

	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:90)
	at reactor.core.publisher.Mono.lambda$onErrorMap$28(Mono.java:3783)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
	at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onError(ReactorContextTestExecutionListener.java:125)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121)
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onError(MonoSingle.java:150)
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:97)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
	at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onError(ReactorContextTestExecutionListener.java:125)
	at reactor.core.publisher.Operators.error(Operators.java:198)
	at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
	at reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onError(MonoDelayUntil.java:514)
	at org.springframework.security.test.context.support.ReactorContextTestExecutionListener$DelegateTestExecutionListener$SecuritySubContext.onError(ReactorContextTestExecutionListener.java:125)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:135)
	at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:343)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:814)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:739)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:161)
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:684)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:936)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:857)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
		at reactor.core.publisher.Mono.block(Mono.java:1712)
		at org.hellenicarmy.operationalmap.server.repository.EventRepositoryTest.testSave(EventRepositoryTest.java:101)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
		at java.base/java.lang.reflect.Method.invoke(Method.java:568)
		at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
		at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
		at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
		at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
		at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
		at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
		at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
		at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
		at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
		at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
		at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
		at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
		at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
		at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
		at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
		at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
		at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
		at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
		at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
		at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
		at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
		at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
		at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
		at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
		at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
		at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
		at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
		at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: [53300] sorry, too many clients already
	at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:109)
	at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
	at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:113)
	... 35 more

If this issue is more relevant to r2dbc-postgresql or to r2dbc-pool let me know.

springboot:3.1.2
postgresql:15.3

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Sep 13, 2023
@mp911de
Copy link
Member

mp911de commented Sep 13, 2023

You're allocating more connections than permitted: [53300] sorry, too many clients already

flatMap introduces concurrency, so you want to limit concurrency there.

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale Sep 13, 2023
@mp911de mp911de added status: invalid An issue that we don't feel is valid and removed status: waiting-for-triage An issue we've not yet triaged labels Sep 13, 2023
@GeorgePap-719
Copy link
Contributor Author

GeorgePap-719 commented Sep 13, 2023

The thing is, it breaks on my app server where different threads are calling eventRepository.save(event);.

That's why i reproduced it with flatMap().

I was under the impression that if i enable the pooling feature, then the connections should never exceed 10.

Shouldn't the pool handle the concurrency as it requests for new connection on each request?.
Otherwise what's the recommended approach?

@mp911de
Copy link
Member

mp911de commented Sep 13, 2023

While the pool limits concurrency, the pool might still be configured to allow more connections than your server does.

@GeorgePap-719
Copy link
Contributor Author

I'm using the default one spring provides me by injecting R2dbcEntityTemplate. Is there any chance that i have misconfigured something?

The only place where i write some config code for r2dbc is here:

@Configuration
//@EnableR2dbcRepositories
@EnableTransactionManagement
public class R2dbcConfig extends AbstractR2dbcConfiguration {

  private final String username;
  private final String password;
  private final String url;
  private final String database;
  private final List<Converter<Object, Object>> converters;

  R2dbcConfig(
      @Value("${spring.r2dbc.username}") String username,
      @Value("${spring.r2dbc.password}") String password,
      @Value("${spring.r2dbc.url}") String url,
      @Value("${spring.r2dbc.database}") String database,
      @Autowired List<Converter<Object, Object>> converters
  ) {
    this.username = username;
    this.password = password;
    this.url = url;
    this.database = database;
    this.converters = converters;
  }

  @Bean
  public ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new R2dbcTransactionManager(connectionFactory);
  }

  @Override
  @Bean
  public @NonNull ConnectionFactory connectionFactory() {
    return ConnectionFactoryBuilder
        .withUrl(url)
        .username(username)
        .password(password)
        .database(database)
        .build();
  }

  @Override
  protected @NonNull List<Object> getCustomConverters() {
    List<Object> customConverters = new ArrayList<>();
    customConverters.add(new LocalDateTimeToLongConverter());
    customConverters.add(new JsonToMapConverter());
    customConverters.addAll(converters);
    customConverters.addAll(PostgresDialect.INSTANCE.getConverters());
    return customConverters;
  }
}

@mp911de
Copy link
Member

mp911de commented Sep 13, 2023

Unless your R2DBC URL starts with r2dbc:pool:…, you're not using the connection pool. Check out the pool config at https://github.com/r2dbc/r2dbc-pool#getting-started

@GeorgePap-719
Copy link
Contributor Author

I tested that option in the morning but it threw an exception that stated: i cannot use both configs app.props and pool in url. I cannot post the exception atm since the code is on my "work" computer, i will get back at you tomorrow.

Thanks for the help though !

@GeorgePap-719
Copy link
Contributor Author

Hey, here is the exception if I enable the pooling feature both in URL and application.properties:

Caused by: java.lang.IllegalStateException: Error processing condition on org.springframework.boot.autoconfigure.r2dbc.ConnectionFactoryConfigurations$PoolConfiguration
	at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:60)
	at org.springframework.context.annotation.ConditionEvaluator.shouldSkip(ConditionEvaluator.java:108)
	at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:219)
	at org.springframework.context.annotation.ConfigurationClassParser.processImports(ConfigurationClassParser.java:514)
	... 87 more
Caused by: org.springframework.boot.autoconfigure.r2dbc.MultipleConnectionPoolConfigurationsException: R2DBC connection pooling configuration should be provided by either the spring.r2dbc.pool.* properties or the spring.r2dbc.url property but both have been used.
	at org.springframework.boot.autoconfigure.r2dbc.ConnectionFactoryConfigurations$PooledConnectionFactoryCondition.getMatchOutcome(ConnectionFactoryConfigurations.java:151)
	at org.springframework.boot.autoconfigure.condition.SpringBootCondition.matches(SpringBootCondition.java:47)
	... 90 more

But it seems choosing to enable the feature through URL, it works and Im not getting the error: too many clients.
My properties:

spring.r2dbc.url=r2dbc:pool:postgresql://x:x/x?maxSize=40

Thanks for the help.

So maybe there is a bug in enabling the pooling through application.properties:

spring.r2dbc.pool.enabled=true
spring.r2dbc.pool.max-size=10

@mp911de
Copy link
Member

mp911de commented Sep 14, 2023

If you use properties-based configuration, then please remove your R2dbcConfig class as Spring Boot will back off if it sees that R2DBC infrastructure components are defined already by your config.

@GeorgePap-719
Copy link
Contributor Author

Oh i see, that makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants