Skip to content

Commit

Permalink
Supported CommandNameMapping. See: eventuate-tram/eventuate-tram-core…
Browse files Browse the repository at this point in the history
  • Loading branch information
dartartem committed Aug 30, 2021
1 parent dc683c8 commit 005d3a8
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.micronaut.participant;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
Expand All @@ -11,7 +12,10 @@
@Factory
public class SagaParticipantFactory {
@Singleton
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer, MessageProducer messageProducer, SagaLockManager sagaLockManager) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager);
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.micronaut.testing;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.testing.SagaParticipantChannels;
Expand All @@ -12,7 +13,10 @@
public class SagaParticipantStubManagerFactory {

@Singleton
public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, MessageProducer messageProducer) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer);
public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
CommandNameMapping commandNameMapping) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer, commandNameMapping);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.eventuate.tram.sagas.participant;

import io.eventuate.tram.commands.common.CommandMessageHeaders;
import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlers;
Expand All @@ -25,8 +26,9 @@ public SagaCommandDispatcher(String commandDispatcherId,
CommandHandlers target,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager) {
super(commandDispatcherId, target, messageConsumer, messageProducer);
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
super(commandDispatcherId, target, messageConsumer, messageProducer, commandNameMapping);
this.sagaLockManager = sagaLockManager;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.participant;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandHandlers;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
Expand All @@ -10,14 +11,19 @@ public class SagaCommandDispatcherFactory {
private final MessageConsumer messageConsumer;
private final MessageProducer messageProducer;
private final SagaLockManager sagaLockManager;
private final CommandNameMapping commandNameMapping;

public SagaCommandDispatcherFactory(MessageConsumer messageConsumer, MessageProducer messageProducer, SagaLockManager sagaLockManager) {
public SagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
this.messageConsumer = messageConsumer;
this.messageProducer = messageProducer;
this.sagaLockManager = sagaLockManager;
this.commandNameMapping = commandNameMapping;
}

public SagaCommandDispatcher make(String commandDispatcherId, CommandHandlers target) {
return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, messageProducer, sagaLockManager);
return new SagaCommandDispatcher(commandDispatcherId, target, messageConsumer, messageProducer, sagaLockManager, commandNameMapping);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.spring.participant;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.MessageProducer;
import io.eventuate.tram.sagas.common.SagaLockManager;
Expand All @@ -15,7 +16,8 @@ public class SagaParticipantConfiguration {
@Bean
public SagaCommandDispatcherFactory sagaCommandDispatcherFactory(MessageConsumer messageConsumer,
MessageProducer messageProducer,
SagaLockManager sagaLockManager) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager);
SagaLockManager sagaLockManager,
CommandNameMapping commandNameMapping) {
return new SagaCommandDispatcherFactory(messageConsumer, messageProducer, sagaLockManager, commandNameMapping);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.spring.testing;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.sagas.testing.SagaParticipantChannels;
import io.eventuate.tram.sagas.testing.SagaParticipantStubManager;
import io.eventuate.tram.spring.commands.producer.TramCommandProducerConfiguration;
Expand All @@ -15,8 +16,11 @@
public class SagaParticipantStubManagerConfiguration {

@Bean
public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, MessageProducer messageProducer) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer);
public SagaParticipantStubManager sagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
CommandNameMapping commandNameMapping) {
return new SagaParticipantStubManager(sagaParticipantChannels, messageConsumer, messageProducer, commandNameMapping);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.tram.commands.common.Command;
import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandMessage;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
Expand Down Expand Up @@ -29,13 +30,17 @@ public class SagaParticipantStubManager {
private String currentCommandChannel;


public SagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels, MessageConsumer messageConsumer, MessageProducer messageProducer) {
public SagaParticipantStubManager(SagaParticipantChannels sagaParticipantChannels,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
CommandNameMapping commandNameMapping) {
this.commandChannels = sagaParticipantChannels.getChannels();
this.commandHandlers = new ReconfigurableCommandHandlers(this.commandChannels);
this.commandDispatcher = new UnhandledMessageTrackingCommandDispatcher("SagaParticipantStubManager-command-dispatcher-" + System.currentTimeMillis(),
commandHandlers,
messageConsumer,
messageProducer);
messageProducer,
commandNameMapping);

/// TODO handle scenario where a command is recieved for which there is not a handler.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ public void update(SagaInstance sagaInstance) {

};

CommandNameMapping commandNameMapping = new DefaultCommandNameMapping();

CommandProducerImpl commandProducer = new CommandProducerImpl((destination, message) -> {
String id = genId();
message.setHeader(Message.ID, id);
sentCommands.add(new MessageWithDestination(destination, message));
});
}, commandNameMapping);

SagaCommandProducer sagaCommandProducer = new SagaCommandProducer(commandProducer);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.eventuate.tram.sagas.testing.commandhandling;

import io.eventuate.tram.commands.common.CommandNameMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.consumer.CommandHandler;
import io.eventuate.tram.commands.consumer.CommandHandlers;
Expand All @@ -22,8 +23,12 @@ public class UnhandledMessageTrackingCommandDispatcher extends CommandDispatcher
private CommandHandlers commandHandlers;
private List<Message> unhandledMessages = new LinkedList<>();

public UnhandledMessageTrackingCommandDispatcher(String commandDispatcherId, CommandHandlers commandHandlers, MessageConsumer messageConsumer, MessageProducer messageProducer) {
super(commandDispatcherId, commandHandlers, messageConsumer, messageProducer);
public UnhandledMessageTrackingCommandDispatcher(String commandDispatcherId,
CommandHandlers commandHandlers,
MessageConsumer messageConsumer,
MessageProducer messageProducer,
CommandNameMapping commandNameMapping) {
super(commandDispatcherId, commandHandlers, messageConsumer, messageProducer, commandNameMapping);
this.commandHandlers = commandHandlers;
}

Expand Down

0 comments on commit 005d3a8

Please sign in to comment.