Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction rollback does not work when using AbstractRoutingConnectionFactory. #794

Closed
stonegyu opened this issue Oct 21, 2022 · 10 comments
Labels
for: stackoverflow A question that's better suited to stackoverflow.com

Comments

@stonegyu
Copy link

stonegyu commented Oct 21, 2022

hello

When using AbstractRoutingConnectionFactory, Transaction Rollback does not work.

dependencies :

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>2.5.13</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    <version>2.5.13</version>
</dependency>

configuration information

@Slf4j
@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
@EnableTransactionManagement
class DataR2dbcConfig : AbstractR2dbcConfiguration() {
    // Omit r2dbc connection information..

    @Primary
    @Bean("connectionFactory")
    override fun connectionFactory(): ConnectionFactory {

        val primaryConnectionFactory = primaryConnectionFactory()

        val secondaryConnectionFactory = secondaryConnectionFactory()

        val routingConnectionFactory: AbstractRoutingConnectionFactory = object : AbstractRoutingConnectionFactory() {
            override fun determineCurrentLookupKey(): Mono<Any> {
                return TransactionSynchronizationManager.forCurrentTransaction().map {
                    if (it.isActualTransactionActive) {
                        if (it.isCurrentTransactionReadOnly) "secondary" else "primary"
                    } else {
                        "secondary"
                    }
                }
            }
        }
        val factories = mapOf("primary" to primaryConnectionFactory, "secondary" to secondaryConnectionFactory)

        routingConnectionFactory.setLenientFallback(true)
        routingConnectionFactory.setDefaultTargetConnectionFactory(secondaryConnectionFactory)
        routingConnectionFactory.setTargetConnectionFactories(factories)

        return routingConnectionFactory
    }

    @Primary
    @Bean("reactiveTransactionManager")
    fun reactiveTransactionManager(@Qualifier("connectionFactory") connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(TransactionAwareConnectionFactoryProxy(connectionFactory))
    }
}

Transaction rollback worked normally when using only the primary connection factory.
When using AbstractRoutingConnectionFactory, the transaction doesn't seem to work properly.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Oct 21, 2022
@mp911de
Copy link
Member

mp911de commented Oct 24, 2022

If you would like us to spend some time helping you to diagnose the problem, please spend some time describing it and, ideally, providing a minimal sample that reproduces the problem.

@mp911de mp911de added the status: waiting-for-feedback We need additional information before we can continue label Oct 24, 2022
@stonegyu
Copy link
Author

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Oct 25, 2022
@stonegyu
Copy link
Author

stonegyu commented Nov 9, 2022

@mp911de
It's been 2 weeks since you asked. Should I wait a bit more for an answer?

@mp911de
Copy link
Member

mp911de commented Nov 10, 2022

I still need to investigate what's going on but had not any bandwidth to do so.

@stonegyu
Copy link
Author

stonegyu commented Nov 10, 2022

@mp911de
Okay..Are you saying you don't have time to fix the issue right now?
How much longer should I wait?(I'm not saying you want it to be resolved quickly.)
Please let me know the approximate schedule.

(I'm not familiar with English. Please don't misunderstand what I'm saying.)

@mp911de
Copy link
Member

mp911de commented Nov 15, 2022

Transaction definition flags are propagated to the transaction context after obtaining a connection from a connection factory, therefore, the read only flag isn't evaluated at the time of connection retrieval. Your only chance to apply a routing across connection factories is attaching some contextual information to the coroutine invocation and not rely on Spring's @Transactional annotation.

@stonegyu
Copy link
Author

@mp911de
ah..! okay.
Could you please share a sample code that can rollback a transaction using AbstractRoutingConnectionFactory?

I think it will help not only me but also many developers.

@kwidjaja1312
Copy link

kwidjaja1312 commented May 12, 2023

Hello, @mp911de. I would like to add some examples that is the current situation with my application. Due to the company policy, i could only share the high-level information here.

Below is the setup of my application:

  • Spring: spring-boot-3.0.6
  • R2DBC: io.r2dbc:r2dbc-pool-1.0.0.RELEASE, io.asyncer:r2dbc-mysql-1.0.0
  • Other dependencies that are not related to Spring nor R2DBC at all.

Ever since i started to use the DatabaseEndpointAwareConnectionFactory, the transaction does not work at all.

Regardless using @Transactional or TransactionalOperator or both, I got this error below:

//...truncated for clarity
"errMsg": "retry has been exhausted. executeMany; SQL [INSERT INTO x (id, name, organization_id, ...) VALUES (?, ?, ?, ...)]; Lock wait timeout exceeded; try restarting transaction",
//...truncated for clarity

That failure happened to the same code base after i upgraded to Spring Boot 3.x which includes the major upgrade on the R2DBC specification itself. Let's consider this sample codes for illustration, the main point is the usage of the TransactionalOperator.

interface RepositoryA extends ReactiveCrudRepository<A, Integer> {
  ...
}

interface RepositoryB extends ReactiveCrudRepository<B, Long> {
  ...
}

@Service
public class ServiceA {
  private final RepositoryA repoA;

  public Mono<A> saveNew(A a) {
    ...
  }
}

@Service
public class ServiceB {
  private final RepositoryB repoB;

  public Mono<B> saveNew(B b) {
    ...
  }
}

@Data
public class Home {
  private A a;
  private B b;
}

@Controller
public class HomeController {

  private final TransactionalOperator operator;
  private final ServiceA serviceA;
  private final ServiceB serviceB;

  @PostMapping("/addService")
  public Mono<Home> addServices(@RequestBody Home home) {
    return Mono.zip(
          serviceA.save(home.getA()),
          serviceB.save(home.getB()),
      )
      .map(t2 -> {
          home.setA(t2.getT1());
          home.setB(t2.getT2());
         
          return home;
      })
      .as(operator::transactional);
  }
}

Even when i changed the controller into something like below, i still got the same error.

NOTE: the @EnableTransactionManagement has been configured somewhere else.


@Controller
public class HomeController {

  private final TransactionalOperator operator;
  private final ServiceA serviceA;
  private final ServiceB serviceB;

  @PostMapping("/addService")
  @Transactional(propagation = Propagation.REQUIRE_NEW, timeout = 10, rollbackFor = [{Exception.class}])
  public Mono<Home> addServices(@RequestBody Home home) {
    return Mono.zip(
          serviceA.save(home.getA()),
          serviceB.save(home.getB()),
      )
      .map(t2 -> {
          home.setA(t2.getT1());
          home.setB(t2.getT2());
          
          return home;
      });
  }
}

To make it more interesting, the same codes above was working fine when i was still using this stack:

  • Spring: spring-boot-2.7.6
  • R2DBC: io.r2dbc:r2dbc-pool-1.0.0.RELEASE, dev.miku:r2dbc-mysql-0.8.2
  • Other dependencies that are not related to Spring nor R2DBC at all.

@mp911de So, do i need to refactor my codes to a certain extent to make the TransactionalOperator works again? Could you enlighten me where is the problem with the above codes? Thank you very much for your pointer in advance.

@mp911de
Copy link
Member

mp911de commented Sep 13, 2023

It would be something along the lines of:

        val routingConnectionFactory: AbstractRoutingConnectionFactory = object : AbstractRoutingConnectionFactory() {
            override fun determineCurrentLookupKey(): Mono<Any> {
                return Mono.deferContextual(contextView -> Mono.just(contextView.getOrDefault("routing", "primary")))
            }
        }

On the resulting publisher that contains the Database access sequence, you would append something contextual like .contextWrite(context -> context.put("routing", "secondary")).

@mp911de mp911de closed this as completed Sep 13, 2023
@mp911de mp911de added for: stackoverflow A question that's better suited to stackoverflow.com and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged labels Sep 13, 2023
@yangwenliang123
Copy link

它将是这样的:

        val routingConnectionFactory: AbstractRoutingConnectionFactory = object : AbstractRoutingConnectionFactory() {
            override fun determineCurrentLookupKey(): Mono<Any> {
                return Mono.deferContextual(contextView -> Mono.just(contextView.getOrDefault("routing", "primary")))
            }
        }

在包含 Database 访问序列的结果发布者上,您可以附加类似 ..contextWrite(context -> context.put("routing", "secondary"))

I want to parse SQL to determine a specific shard that will ultimately access the database

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com
Projects
None yet
Development

No branches or pull requests

5 participants