Skip to content

Commit

Permalink
Merge pull request #133 from dartartem/wip-db-id-gen
Browse files Browse the repository at this point in the history
Extracted transaction manager from in-memory tests, added additional …
  • Loading branch information
cer authored Feb 20, 2021
2 parents c76816b + a6df659 commit b9bd98a
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 177 deletions.
6 changes: 6 additions & 0 deletions eventuate-tram-common-spring-in-memory/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-in-memory")
compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootCdcVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.eventuate.tram.common.spring.inmemory;

import io.eventuate.tram.inmemory.EventuateTransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

public class EventuateSpringTransactionSynchronizationManager
implements EventuateTransactionSynchronizationManager {

@Override
public boolean isTransactionActive() {
return TransactionSynchronizationManager.isActualTransactionActive();
}

@Override
public void executeAfterTransaction(Runnable callback) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
callback.run();
}
});
}
}
7 changes: 7 additions & 0 deletions eventuate-tram-in-memory-test/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-in-memory")

compile "junit:junit:4.12"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.eventuate.tram.inmemory.test;

import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.consumer.MessageHandler;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import io.eventuate.tram.messaging.producer.MessageProducer;

import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public abstract class AbstractInMemoryMessageProducerTest {
protected String subscriberId;
protected String destination;
protected String payload;
protected MyMessageHandler mh;

protected abstract MessageProducer getMessageProducer();
protected abstract MessageConsumer getMessageConsumer();
protected abstract void executeInTransaction(Consumer<Runnable> callbackWithRollback);

public void setUp() {
subscriberId = "subscriberId-" + System.currentTimeMillis();
destination = "destination-" + System.currentTimeMillis();
payload = "payload-" + System.currentTimeMillis();
mh = new MyMessageHandler();
}

public void shouldDeliverToMatchingSubscribers() {
subscribe();
Message m = sendMessage();
assertNotNull(m.getId());
assertMessageReceived();
}

public void shouldSetIdWithinTransaction() {
Message m = makeMessage();
executeInTransaction(rollbackCallback -> {
getMessageProducer().send(destination, m);
assertNotNull(m.getId());
});
}

public void shouldDeliverToWildcardSubscribers() {
wildcardSubscribe();
sendMessage();
assertMessageReceived();
}

public void shouldReceiveMessageAfterTransaction() {
subscribe();
executeInTransaction(rollbackCallback -> sendMessage());
assertMessageReceived();
}

public void shouldNotReceiveMessageBeforeTransaction() {
subscribe();

executeInTransaction(rollbackCallback -> {
sendMessage();
assertMessageNotReceived();
});

assertMessageReceived();
}

public void shouldNotReceiveMessageAfterTransactionRollback() {
subscribe();

executeInTransaction(rollbackCallback -> {
sendMessage();
rollbackCallback.run();
});

assertMessageNotReceived();
}

protected void assertMessageReceived() {
mh.assertMessageReceived(payload);
}

protected void assertMessageNotReceived() {
mh.assertMessageNotReceived(payload);
}

protected Message sendMessage() {
Message m = makeMessage();
getMessageProducer().send(destination, m);
return m;
}

protected void wildcardSubscribe() {
subscribe("*");
}

protected void subscribe() {
subscribe(destination);
}

protected void subscribe(String destination) {
getMessageConsumer().subscribe(subscriberId, Collections.singleton(destination), mh);
}

protected Message makeMessage() {
return MessageBuilder.withPayload(payload).withHeader(Message.DESTINATION, destination).build();
}

protected static class MyMessageHandler implements MessageHandler {

protected BlockingQueue<String> queue = new LinkedBlockingDeque<>();

@Override
public void accept(Message message) {
try {
queue.put(message.getPayload());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

void assertMessageReceived(String payload) {
assertTrue(searchMessage(payload));
}

void assertMessageNotReceived(String payload) {
assertFalse(searchMessage(payload));
}

boolean searchMessage(String payload) {
String m;

try {
while ((m = queue.poll(3, TimeUnit.SECONDS)) != null) {
if (payload.equals(m)) {
return true;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

return false;
}
}
}
4 changes: 2 additions & 2 deletions eventuate-tram-in-memory/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")
compile project(":eventuate-tram-spring-consumer-common")
compile project(":eventuate-tram-spring-messaging-producer-common")
compile project(":eventuate-tram-consumer-common")
compile project(":eventuate-tram-messaging-producer-common")

compile project(":eventuate-tram-consumer-jdbc")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.eventuate.tram.inmemory;

public interface EventuateTransactionSynchronizationManager {
boolean isTransactionActive();
void executeAfterTransaction(Runnable runnable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,27 @@
import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

public class InMemoryMessageProducer implements MessageProducerImplementation {

private Logger logger = LoggerFactory.getLogger(getClass());

private final InMemoryMessageConsumer messageConsumer;
private EventuateTransactionSynchronizationManager eventuateTransactionSynchronizationManager;

private ApplicationIdGenerator applicationIdGenerator = new ApplicationIdGenerator();

public InMemoryMessageProducer(InMemoryMessageConsumer messageConsumer) {
public InMemoryMessageProducer(InMemoryMessageConsumer messageConsumer,
EventuateTransactionSynchronizationManager eventuateTransactionSynchronizationManager) {
this.messageConsumer = messageConsumer;
this.eventuateTransactionSynchronizationManager = eventuateTransactionSynchronizationManager;
}

@Override
public void withContext(Runnable runnable) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
if (eventuateTransactionSynchronizationManager.isTransactionActive()) {
logger.info("Transaction active");
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
runnable.run();
}
});
eventuateTransactionSynchronizationManager.executeAfterTransaction(runnable);
} else {
logger.info("No transaction active");
runnable.run();
Expand Down
2 changes: 2 additions & 0 deletions eventuate-tram-micronaut-in-memory/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-in-memory")
compile project(":eventuate-tram-common-spring-in-memory")
testCompile project(":eventuate-tram-in-memory-test")

compile project(":eventuate-tram-micronaut-messaging")
compile project(":eventuate-tram-micronaut-consumer-common")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.eventuate.tram.micronaut.inmemory;

import io.eventuate.common.id.IdGenerator;
import io.eventuate.common.inmemorydatabase.EventuateDatabaseScriptSupplier;
import io.eventuate.tram.common.spring.inmemory.EventuateSpringTransactionSynchronizationManager;
import io.eventuate.tram.consumer.common.MessageConsumerImplementation;
import io.eventuate.tram.inmemory.InMemoryMessageConsumer;
import io.eventuate.tram.inmemory.InMemoryMessageProducer;
Expand Down Expand Up @@ -33,7 +33,7 @@ public MessageConsumerImplementation messageConsumerImplementation(InMemoryMessa

@Singleton
public InMemoryMessageProducer inMemoryMessageProducer(InMemoryMessageConsumer messageConsumer) {
return new InMemoryMessageProducer(messageConsumer);
return new InMemoryMessageProducer(messageConsumer, new EventuateSpringTransactionSynchronizationManager());
}

@Singleton
Expand Down
Loading

0 comments on commit b9bd98a

Please sign in to comment.