From a6df65935d678017d259ed6381702d46a71d48f8 Mon Sep 17 00:00:00 2001 From: Artem Sidorkin Date: Sat, 20 Feb 2021 17:28:09 +0300 Subject: [PATCH] Extracted transaction manager from in-memory tests, added additional test cases. See https://github.com/eventuate-tram/eventuate-tram-sagas-quarkus/issues/6 --- .../build.gradle | 6 + ...ringTransactionSynchronizationManager.java | 24 +++ eventuate-tram-in-memory-test/build.gradle | 7 + .../AbstractInMemoryMessageProducerTest.java | 152 ++++++++++++++++++ eventuate-tram-in-memory/build.gradle | 4 +- ...uateTransactionSynchronizationManager.java | 6 + .../inmemory/InMemoryMessageProducer.java | 16 +- .../build.gradle | 2 + .../inmemory/TramInMemoryFactory.java | 4 +- .../inmemory/InMemoryMessageProducerTest.java | 119 +++++--------- .../src/test/resources/application.yml | 13 +- eventuate-tram-spring-in-memory/build.gradle | 3 + .../TramInMemoryCommonConfiguration.java | 10 +- .../inmemory/InMemoryMessageProducerTest.java | 123 ++++++-------- settings.gradle | 2 + 15 files changed, 314 insertions(+), 177 deletions(-) create mode 100644 eventuate-tram-common-spring-in-memory/build.gradle create mode 100644 eventuate-tram-common-spring-in-memory/src/main/java/io/eventuate/tram/common/spring/inmemory/EventuateSpringTransactionSynchronizationManager.java create mode 100644 eventuate-tram-in-memory-test/build.gradle create mode 100644 eventuate-tram-in-memory-test/src/main/java/io/eventuate/tram/inmemory/test/AbstractInMemoryMessageProducerTest.java create mode 100644 eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/EventuateTransactionSynchronizationManager.java diff --git a/eventuate-tram-common-spring-in-memory/build.gradle b/eventuate-tram-common-spring-in-memory/build.gradle new file mode 100644 index 00000000..83f687c5 --- /dev/null +++ b/eventuate-tram-common-spring-in-memory/build.gradle @@ -0,0 +1,6 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile project(":eventuate-tram-in-memory") + compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootCdcVersion" +} \ No newline at end of file diff --git a/eventuate-tram-common-spring-in-memory/src/main/java/io/eventuate/tram/common/spring/inmemory/EventuateSpringTransactionSynchronizationManager.java b/eventuate-tram-common-spring-in-memory/src/main/java/io/eventuate/tram/common/spring/inmemory/EventuateSpringTransactionSynchronizationManager.java new file mode 100644 index 00000000..989c333a --- /dev/null +++ b/eventuate-tram-common-spring-in-memory/src/main/java/io/eventuate/tram/common/spring/inmemory/EventuateSpringTransactionSynchronizationManager.java @@ -0,0 +1,24 @@ +package io.eventuate.tram.common.spring.inmemory; + +import io.eventuate.tram.inmemory.EventuateTransactionSynchronizationManager; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +public class EventuateSpringTransactionSynchronizationManager + implements EventuateTransactionSynchronizationManager { + + @Override + public boolean isTransactionActive() { + return TransactionSynchronizationManager.isActualTransactionActive(); + } + + @Override + public void executeAfterTransaction(Runnable callback) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { + @Override + public void afterCommit() { + callback.run(); + } + }); + } +} diff --git a/eventuate-tram-in-memory-test/build.gradle b/eventuate-tram-in-memory-test/build.gradle new file mode 100644 index 00000000..ba286b3d --- /dev/null +++ b/eventuate-tram-in-memory-test/build.gradle @@ -0,0 +1,7 @@ +apply plugin: PublicModulePlugin + +dependencies { + compile project(":eventuate-tram-in-memory") + + compile "junit:junit:4.12" +} \ No newline at end of file diff --git a/eventuate-tram-in-memory-test/src/main/java/io/eventuate/tram/inmemory/test/AbstractInMemoryMessageProducerTest.java b/eventuate-tram-in-memory-test/src/main/java/io/eventuate/tram/inmemory/test/AbstractInMemoryMessageProducerTest.java new file mode 100644 index 00000000..1b56cc7b --- /dev/null +++ b/eventuate-tram-in-memory-test/src/main/java/io/eventuate/tram/inmemory/test/AbstractInMemoryMessageProducerTest.java @@ -0,0 +1,152 @@ +package io.eventuate.tram.inmemory.test; + +import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.messaging.consumer.MessageConsumer; +import io.eventuate.tram.messaging.consumer.MessageHandler; +import io.eventuate.tram.messaging.producer.MessageBuilder; +import io.eventuate.tram.messaging.producer.MessageProducer; + +import java.util.Collections; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public abstract class AbstractInMemoryMessageProducerTest { + protected String subscriberId; + protected String destination; + protected String payload; + protected MyMessageHandler mh; + + protected abstract MessageProducer getMessageProducer(); + protected abstract MessageConsumer getMessageConsumer(); + protected abstract void executeInTransaction(Consumer callbackWithRollback); + + public void setUp() { + subscriberId = "subscriberId-" + System.currentTimeMillis(); + destination = "destination-" + System.currentTimeMillis(); + payload = "payload-" + System.currentTimeMillis(); + mh = new MyMessageHandler(); + } + + public void shouldDeliverToMatchingSubscribers() { + subscribe(); + Message m = sendMessage(); + assertNotNull(m.getId()); + assertMessageReceived(); + } + + public void shouldSetIdWithinTransaction() { + Message m = makeMessage(); + executeInTransaction(rollbackCallback -> { + getMessageProducer().send(destination, m); + assertNotNull(m.getId()); + }); + } + + public void shouldDeliverToWildcardSubscribers() { + wildcardSubscribe(); + sendMessage(); + assertMessageReceived(); + } + + public void shouldReceiveMessageAfterTransaction() { + subscribe(); + executeInTransaction(rollbackCallback -> sendMessage()); + assertMessageReceived(); + } + + public void shouldNotReceiveMessageBeforeTransaction() { + subscribe(); + + executeInTransaction(rollbackCallback -> { + sendMessage(); + assertMessageNotReceived(); + }); + + assertMessageReceived(); + } + + public void shouldNotReceiveMessageAfterTransactionRollback() { + subscribe(); + + executeInTransaction(rollbackCallback -> { + sendMessage(); + rollbackCallback.run(); + }); + + assertMessageNotReceived(); + } + + protected void assertMessageReceived() { + mh.assertMessageReceived(payload); + } + + protected void assertMessageNotReceived() { + mh.assertMessageNotReceived(payload); + } + + protected Message sendMessage() { + Message m = makeMessage(); + getMessageProducer().send(destination, m); + return m; + } + + protected void wildcardSubscribe() { + subscribe("*"); + } + + protected void subscribe() { + subscribe(destination); + } + + protected void subscribe(String destination) { + getMessageConsumer().subscribe(subscriberId, Collections.singleton(destination), mh); + } + + protected Message makeMessage() { + return MessageBuilder.withPayload(payload).withHeader(Message.DESTINATION, destination).build(); + } + + protected static class MyMessageHandler implements MessageHandler { + + protected BlockingQueue queue = new LinkedBlockingDeque<>(); + + @Override + public void accept(Message message) { + try { + queue.put(message.getPayload()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + void assertMessageReceived(String payload) { + assertTrue(searchMessage(payload)); + } + + void assertMessageNotReceived(String payload) { + assertFalse(searchMessage(payload)); + } + + boolean searchMessage(String payload) { + String m; + + try { + while ((m = queue.poll(3, TimeUnit.SECONDS)) != null) { + if (payload.equals(m)) { + return true; + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return false; + } + } +} \ No newline at end of file diff --git a/eventuate-tram-in-memory/build.gradle b/eventuate-tram-in-memory/build.gradle index 144bc09f..3584884f 100644 --- a/eventuate-tram-in-memory/build.gradle +++ b/eventuate-tram-in-memory/build.gradle @@ -2,8 +2,8 @@ apply plugin: PublicModulePlugin dependencies { compile project(":eventuate-tram-messaging") - compile project(":eventuate-tram-spring-consumer-common") - compile project(":eventuate-tram-spring-messaging-producer-common") + compile project(":eventuate-tram-consumer-common") + compile project(":eventuate-tram-messaging-producer-common") compile project(":eventuate-tram-consumer-jdbc") diff --git a/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/EventuateTransactionSynchronizationManager.java b/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/EventuateTransactionSynchronizationManager.java new file mode 100644 index 00000000..4302b97c --- /dev/null +++ b/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/EventuateTransactionSynchronizationManager.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.inmemory; + +public interface EventuateTransactionSynchronizationManager { + boolean isTransactionActive(); + void executeAfterTransaction(Runnable runnable); +} diff --git a/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageProducer.java b/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageProducer.java index 6002531f..096b9ece 100644 --- a/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageProducer.java +++ b/eventuate-tram-in-memory/src/main/java/io/eventuate/tram/inmemory/InMemoryMessageProducer.java @@ -6,31 +6,27 @@ import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.transaction.support.TransactionSynchronizationAdapter; -import org.springframework.transaction.support.TransactionSynchronizationManager; public class InMemoryMessageProducer implements MessageProducerImplementation { private Logger logger = LoggerFactory.getLogger(getClass()); private final InMemoryMessageConsumer messageConsumer; + private EventuateTransactionSynchronizationManager eventuateTransactionSynchronizationManager; private ApplicationIdGenerator applicationIdGenerator = new ApplicationIdGenerator(); - public InMemoryMessageProducer(InMemoryMessageConsumer messageConsumer) { + public InMemoryMessageProducer(InMemoryMessageConsumer messageConsumer, + EventuateTransactionSynchronizationManager eventuateTransactionSynchronizationManager) { this.messageConsumer = messageConsumer; + this.eventuateTransactionSynchronizationManager = eventuateTransactionSynchronizationManager; } @Override public void withContext(Runnable runnable) { - if (TransactionSynchronizationManager.isActualTransactionActive()) { + if (eventuateTransactionSynchronizationManager.isTransactionActive()) { logger.info("Transaction active"); - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { - @Override - public void afterCommit() { - runnable.run(); - } - }); + eventuateTransactionSynchronizationManager.executeAfterTransaction(runnable); } else { logger.info("No transaction active"); runnable.run(); diff --git a/eventuate-tram-micronaut-in-memory/build.gradle b/eventuate-tram-micronaut-in-memory/build.gradle index b5810f84..b15580f6 100644 --- a/eventuate-tram-micronaut-in-memory/build.gradle +++ b/eventuate-tram-micronaut-in-memory/build.gradle @@ -14,6 +14,8 @@ apply plugin: PublicModulePlugin dependencies { compile project(":eventuate-tram-in-memory") + compile project(":eventuate-tram-common-spring-in-memory") + testCompile project(":eventuate-tram-in-memory-test") compile project(":eventuate-tram-micronaut-messaging") compile project(":eventuate-tram-micronaut-consumer-common") diff --git a/eventuate-tram-micronaut-in-memory/src/main/java/io/eventuate/tram/micronaut/inmemory/TramInMemoryFactory.java b/eventuate-tram-micronaut-in-memory/src/main/java/io/eventuate/tram/micronaut/inmemory/TramInMemoryFactory.java index 2434f280..67264271 100644 --- a/eventuate-tram-micronaut-in-memory/src/main/java/io/eventuate/tram/micronaut/inmemory/TramInMemoryFactory.java +++ b/eventuate-tram-micronaut-in-memory/src/main/java/io/eventuate/tram/micronaut/inmemory/TramInMemoryFactory.java @@ -1,7 +1,7 @@ package io.eventuate.tram.micronaut.inmemory; -import io.eventuate.common.id.IdGenerator; import io.eventuate.common.inmemorydatabase.EventuateDatabaseScriptSupplier; +import io.eventuate.tram.common.spring.inmemory.EventuateSpringTransactionSynchronizationManager; import io.eventuate.tram.consumer.common.MessageConsumerImplementation; import io.eventuate.tram.inmemory.InMemoryMessageConsumer; import io.eventuate.tram.inmemory.InMemoryMessageProducer; @@ -33,7 +33,7 @@ public MessageConsumerImplementation messageConsumerImplementation(InMemoryMessa @Singleton public InMemoryMessageProducer inMemoryMessageProducer(InMemoryMessageConsumer messageConsumer) { - return new InMemoryMessageProducer(messageConsumer); + return new InMemoryMessageProducer(messageConsumer, new EventuateSpringTransactionSynchronizationManager()); } @Singleton diff --git a/eventuate-tram-micronaut-in-memory/src/test/java/io/eventuate/tram/micronaut/inmemory/InMemoryMessageProducerTest.java b/eventuate-tram-micronaut-in-memory/src/test/java/io/eventuate/tram/micronaut/inmemory/InMemoryMessageProducerTest.java index d4d79ac3..285d88ab 100644 --- a/eventuate-tram-micronaut-in-memory/src/test/java/io/eventuate/tram/micronaut/inmemory/InMemoryMessageProducerTest.java +++ b/eventuate-tram-micronaut-in-memory/src/test/java/io/eventuate/tram/micronaut/inmemory/InMemoryMessageProducerTest.java @@ -1,116 +1,85 @@ package io.eventuate.tram.micronaut.inmemory; -import io.eventuate.tram.inmemory.InMemoryMessageConsumer; -import io.eventuate.tram.inmemory.InMemoryMessageProducer; -import io.eventuate.tram.messaging.common.Message; -import io.eventuate.tram.messaging.consumer.MessageHandler; -import io.eventuate.tram.messaging.producer.MessageBuilder; +import io.eventuate.tram.messaging.consumer.MessageConsumer; +import io.eventuate.tram.messaging.producer.MessageProducer; +import io.eventuate.tram.inmemory.test.AbstractInMemoryMessageProducerTest; import io.micronaut.test.annotation.MicronautTest; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; import javax.inject.Inject; -import java.util.Collections; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; +import java.util.function.Consumer; @MicronautTest(transactional = false) -public class InMemoryMessageProducerTest { - - private String subscriberId; - private String destination; - private String payload; - private MyMessageHandler mh; +public class InMemoryMessageProducerTest extends AbstractInMemoryMessageProducerTest { @Inject - private InMemoryMessageProducer inMemoryMessageProducer; + private MessageProducer messageProducer; @Inject - private InMemoryMessageConsumer inMemoryMessageConsumer; + private MessageConsumer messageConsumer; @Inject private TransactionTemplate transactionTemplate; + @Override + protected MessageProducer getMessageProducer() { + return messageProducer; + } - class MyMessageHandler implements MessageHandler { - - private BlockingQueue queue = new LinkedBlockingDeque<>(); - - @Override - public void accept(Message message) { - try { - queue.put(message.getPayload()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - void shouldReceiveMessage(String payload) { - String m; - try { - while ((m = queue.poll(1, TimeUnit.SECONDS)) != null) { - if (payload.equals(m)) - return; - } - fail("Didn't find message with payload: " + payload); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + @Override + protected MessageConsumer getMessageConsumer() { + return messageConsumer; } + @Override + protected void executeInTransaction(Consumer callbackWithRollback) { + transactionTemplate.execute(status -> { + callbackWithRollback.accept(status::setRollbackOnly); + return null; + }); + } + + @Override @BeforeEach public void setUp() { - subscriberId = "subscriberId-" + System.currentTimeMillis(); - destination = "destination-" + System.currentTimeMillis(); - payload = "payload-" + System.currentTimeMillis(); - mh = new MyMessageHandler(); + super.setUp(); } + @Override @Test public void shouldDeliverToMatchingSubscribers() { - - inMemoryMessageConsumer.subscribe(subscriberId, Collections.singleton(destination), mh); - - Message m = makeMessage(); - inMemoryMessageProducer.send(m); - assertNotNull(m.getId()); - mh.shouldReceiveMessage(payload); - + super.shouldDeliverToMatchingSubscribers(); } + @Override @Test public void shouldSetIdWithinTransaction() { - Message m = makeMessage(); - transactionTemplate.execute((TransactionCallback) status -> { - inMemoryMessageProducer.send(m); - assertNotNull(m.getId()); - return null; - }); + super.shouldSetIdWithinTransaction(); } + @Override @Test public void shouldDeliverToWildcardSubscribers() { + super.shouldDeliverToWildcardSubscribers(); + } - inMemoryMessageConsumer.subscribe(subscriberId, Collections.singleton("*"), mh); - - Message m = makeMessage(); - - inMemoryMessageProducer.send(m); - - mh.shouldReceiveMessage(payload); - + @Override + @Test + public void shouldReceiveMessageAfterTransaction() { + super.shouldReceiveMessageAfterTransaction(); } - private Message makeMessage() { - return MessageBuilder.withPayload(payload).withHeader(Message.DESTINATION, destination).withHeader(Message.ID, "message-id").build(); + @Override + @Test + public void shouldNotReceiveMessageBeforeTransaction() { + super.shouldNotReceiveMessageBeforeTransaction(); } + @Override + @Test + public void shouldNotReceiveMessageAfterTransactionRollback() { + super.shouldNotReceiveMessageAfterTransactionRollback(); + } } \ No newline at end of file diff --git a/eventuate-tram-micronaut-in-memory/src/test/resources/application.yml b/eventuate-tram-micronaut-in-memory/src/test/resources/application.yml index da2d99be..c2c372f8 100644 --- a/eventuate-tram-micronaut-in-memory/src/test/resources/application.yml +++ b/eventuate-tram-micronaut-in-memory/src/test/resources/application.yml @@ -1,7 +1,8 @@ -datasources: - default: - url: ${DATASOURCE_URL} - username: ${DATASOURCE_USERNAME} - password: ${DATASOURCE_PASSWORD} - driverClassName: ${DATASOURCE_DRIVERCLASSNAME} +transactional: + noop: + duplicate: + message: + detector: + factory: + enabled: false diff --git a/eventuate-tram-spring-in-memory/build.gradle b/eventuate-tram-spring-in-memory/build.gradle index 36ac195c..dfd09517 100644 --- a/eventuate-tram-spring-in-memory/build.gradle +++ b/eventuate-tram-spring-in-memory/build.gradle @@ -2,6 +2,7 @@ apply plugin: PublicModulePlugin dependencies { compile project(":eventuate-tram-in-memory") + compile project(":eventuate-tram-common-spring-in-memory") compile project(":eventuate-tram-spring-messaging") compile project(":eventuate-tram-spring-consumer-common") @@ -12,6 +13,8 @@ dependencies { compile "io.eventuate.common:eventuate-common-spring-in-memory-database:$eventuateCommonVersion" + testCompile project(":eventuate-tram-in-memory-test") + testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion" } \ No newline at end of file diff --git a/eventuate-tram-spring-in-memory/src/main/java/io/eventuate/tram/spring/inmemory/TramInMemoryCommonConfiguration.java b/eventuate-tram-spring-in-memory/src/main/java/io/eventuate/tram/spring/inmemory/TramInMemoryCommonConfiguration.java index 481b4634..9307393c 100644 --- a/eventuate-tram-spring-in-memory/src/main/java/io/eventuate/tram/spring/inmemory/TramInMemoryCommonConfiguration.java +++ b/eventuate-tram-spring-in-memory/src/main/java/io/eventuate/tram/spring/inmemory/TramInMemoryCommonConfiguration.java @@ -1,12 +1,12 @@ package io.eventuate.tram.spring.inmemory; -import io.eventuate.common.id.IdGenerator; +import io.eventuate.common.inmemorydatabase.EventuateDatabaseScriptSupplier; import io.eventuate.common.spring.id.IdGeneratorConfiguration; import io.eventuate.common.spring.inmemorydatabase.EventuateCommonInMemoryDatabaseConfiguration; -import io.eventuate.common.inmemorydatabase.EventuateDatabaseScriptSupplier; -import io.eventuate.tram.spring.consumer.common.TramConsumerCommonConfiguration; +import io.eventuate.tram.common.spring.inmemory.EventuateSpringTransactionSynchronizationManager; import io.eventuate.tram.inmemory.InMemoryMessageConsumer; import io.eventuate.tram.inmemory.InMemoryMessageProducer; +import io.eventuate.tram.spring.consumer.common.TramConsumerCommonConfiguration; import io.eventuate.tram.spring.messaging.producer.common.TramMessagingCommonProducerConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -28,8 +28,8 @@ public InMemoryMessageConsumer inMemoryMessageConsumer() { } @Bean - public InMemoryMessageProducer inMemoryMessageProducer(InMemoryMessageConsumer messageConsumer, IdGenerator idGenerator) { - return new InMemoryMessageProducer(messageConsumer); + public InMemoryMessageProducer inMemoryMessageProducer(InMemoryMessageConsumer messageConsumer) { + return new InMemoryMessageProducer(messageConsumer, new EventuateSpringTransactionSynchronizationManager()); } @Bean diff --git a/eventuate-tram-spring-in-memory/src/test/java/io/eventuate/tram/spring/inmemory/InMemoryMessageProducerTest.java b/eventuate-tram-spring-in-memory/src/test/java/io/eventuate/tram/spring/inmemory/InMemoryMessageProducerTest.java index d5f7d35d..2cd5cd34 100644 --- a/eventuate-tram-spring-in-memory/src/test/java/io/eventuate/tram/spring/inmemory/InMemoryMessageProducerTest.java +++ b/eventuate-tram-spring-in-memory/src/test/java/io/eventuate/tram/spring/inmemory/InMemoryMessageProducerTest.java @@ -1,10 +1,8 @@ package io.eventuate.tram.spring.inmemory; -import io.eventuate.tram.inmemory.InMemoryMessageConsumer; -import io.eventuate.tram.inmemory.InMemoryMessageProducer; -import io.eventuate.tram.messaging.common.Message; -import io.eventuate.tram.messaging.consumer.MessageHandler; -import io.eventuate.tram.messaging.producer.MessageBuilder; +import io.eventuate.tram.messaging.consumer.MessageConsumer; +import io.eventuate.tram.messaging.producer.MessageProducer; +import io.eventuate.tram.inmemory.test.AbstractInMemoryMessageProducerTest; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -14,115 +12,86 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; -import java.util.Collections; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; +import java.util.function.Consumer; @RunWith(SpringRunner.class) @SpringBootTest(classes = InMemoryMessageProducerTest.InMemoryMessagingTestConfiguration.class) -public class InMemoryMessageProducerTest { - - private String subscriberId; - private String destination; - private String payload; - private MyMessageHandler mh; - +public class InMemoryMessageProducerTest extends AbstractInMemoryMessageProducerTest { @Configuration @EnableAutoConfiguration @Import({TramInMemoryConfiguration.class}) public static class InMemoryMessagingTestConfiguration { - } @Autowired - private InMemoryMessageProducer inMemoryMessageProducer; + private TransactionTemplate transactionTemplate; @Autowired - private InMemoryMessageConsumer inMemoryMessageConsumer; + private MessageProducer messageProducer; @Autowired - private TransactionTemplate transactionTemplate; - - - class MyMessageHandler implements MessageHandler { - - private BlockingQueue queue = new LinkedBlockingDeque<>(); - - @Override - public void accept(Message message) { - try { - queue.put(message.getPayload()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - void shouldReceiveMessage(String payload) { - String m; - try { - while ((m = queue.poll(1, TimeUnit.SECONDS)) != null) { - if (payload.equals(m)) - return; - } - fail("Didn't find message with payload: " + payload); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } + private MessageConsumer messageConsumer; + @Override @Before public void setUp() { - subscriberId = "subscriberId-" + System.currentTimeMillis(); - destination = "destination-" + System.currentTimeMillis(); - payload = "payload-" + System.currentTimeMillis(); - mh = new MyMessageHandler(); + super.setUp(); } + @Override @Test public void shouldDeliverToMatchingSubscribers() { - - inMemoryMessageConsumer.subscribe(subscriberId, Collections.singleton(destination), mh); - - Message m = makeMessage(); - inMemoryMessageProducer.send(m); - assertNotNull(m.getId()); - mh.shouldReceiveMessage(payload); - + super.shouldDeliverToMatchingSubscribers(); } + @Override @Test public void shouldSetIdWithinTransaction() { - Message m = makeMessage(); - transactionTemplate.execute((TransactionCallback) status -> { - inMemoryMessageProducer.send(m); - assertNotNull(m.getId()); - return null; - }); + super.shouldSetIdWithinTransaction(); } + @Override @Test public void shouldDeliverToWildcardSubscribers() { + super.shouldDeliverToWildcardSubscribers(); + } - inMemoryMessageConsumer.subscribe(subscriberId, Collections.singleton("*"), mh); - - Message m = makeMessage(); + @Override + @Test + public void shouldReceiveMessageAfterTransaction() { + super.shouldReceiveMessageAfterTransaction(); + } - inMemoryMessageProducer.send(m); + @Override + @Test + public void shouldNotReceiveMessageBeforeTransaction() { + super.shouldNotReceiveMessageBeforeTransaction(); + } - mh.shouldReceiveMessage(payload); + @Override + @Test + public void shouldNotReceiveMessageAfterTransactionRollback() { + super.shouldNotReceiveMessageAfterTransactionRollback(); + } + @Override + protected MessageProducer getMessageProducer() { + return messageProducer; } - private Message makeMessage() { - return MessageBuilder.withPayload(payload).withHeader(Message.DESTINATION, destination).withHeader(Message.ID, "message-id").build(); + @Override + protected MessageConsumer getMessageConsumer() { + return messageConsumer; } + @Override + protected void executeInTransaction(Consumer callbackWithRollback) { + transactionTemplate.execute(status -> { + callbackWithRollback.accept(status::setRollbackOnly); + + return null; + }); + } } \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 3be8a7c1..61573865 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,7 +8,9 @@ include 'eventuate-tram-producer-jdbc' include 'eventuate-tram-spring-producer-jdbc' include 'eventuate-tram-micronaut-producer-jdbc' include 'eventuate-tram-in-memory' +include 'eventuate-tram-in-memory-test' include 'eventuate-tram-spring-in-memory' +include 'eventuate-tram-common-spring-in-memory' include 'eventuate-tram-micronaut-in-memory' include 'eventuate-tram-consumer-common' include 'eventuate-tram-spring-consumer-common'