diff --git a/eventuate-tram-sagas-common-in-memory/src/main/resources/eventuate-tram-sagas-embedded.sql b/eventuate-tram-sagas-common-in-memory/src/main/resources/eventuate-tram-sagas-embedded.sql index d40f13d..5423cf2 100644 --- a/eventuate-tram-sagas-common-in-memory/src/main/resources/eventuate-tram-sagas-embedded.sql +++ b/eventuate-tram-sagas-common-in-memory/src/main/resources/eventuate-tram-sagas-embedded.sql @@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS eventuate.saga_instance( last_request_id VARCHAR(100), end_state INT(1), compensating INT(1), + failed INT(1), saga_data_type VARCHAR(1000) NOT NULL, saga_data_json VARCHAR(1000) NOT NULL, PRIMARY KEY(saga_type, saga_id) diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractSimpleSagaDefinition.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractSimpleSagaDefinition.java new file mode 100644 index 0000000..2318c45 --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractSimpleSagaDefinition.java @@ -0,0 +1,72 @@ +package io.eventuate.tram.sagas.simpledsl; + +import io.eventuate.common.json.mapper.JSonMapper; +import io.eventuate.tram.commands.common.ReplyMessageHeaders; +import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.orchestration.SagaActions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; + +public abstract class AbstractSimpleSagaDefinition, + ToExecute extends AbstractStepToExecute> { + + protected Logger logger = LoggerFactory.getLogger(this.getClass()); + + protected List steps; + + public AbstractSimpleSagaDefinition(List steps) { + this.steps = steps; + } + + protected SagaActions handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) { + logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message); + return SagaActions.builder() + .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState())) + .withIsEndState(true) + .withIsCompensating(state.isCompensating()) + .withIsFailed(true) + .build(); + } + + protected Optional nextStepToExecute(SagaExecutionState state, Data data) { + int skipped = 0; + boolean compensating = state.isCompensating(); + int direction = compensating ? -1 : +1; + for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) { + Step step = steps.get(i); + if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) { + return Optional.of(makeStepToExecute(skipped, compensating, step)); + } else + skipped++; + } + return Optional.empty(); + } + + protected T invokeReplyHandler(Message message, Data data, BiFunction handler) { + Class m; + try { + String className = message.getRequiredHeader(ReplyMessageHeaders.REPLY_TYPE); + m = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + logger.error("Class not found", e); + throw new RuntimeException("Class not found", e); + } + Object reply = JSonMapper.fromJson(message.getPayload(), m); + return handler.apply(data, reply); + } + + protected SagaActions makeEndStateSagaActions(SagaExecutionState state) { + return SagaActions.builder() + .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState())) + .withIsEndState(true) + .withIsCompensating(state.isCompensating()) + .build(); + } + + protected abstract ToExecute makeStepToExecute(int skipped, boolean compensating, Step step); + +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractStepToExecute.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractStepToExecute.java new file mode 100644 index 0000000..d6110fc --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/AbstractStepToExecute.java @@ -0,0 +1,29 @@ +package io.eventuate.tram.sagas.simpledsl; + +import io.eventuate.tram.sagas.orchestration.SagaActions; + +import static io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde.encodeState; + +public class AbstractStepToExecute> { + protected final SagaStep step; + protected final int skipped; + protected final boolean compensating; + + public AbstractStepToExecute(SagaStep step, int skipped, boolean compensating) { + this.step = step; + this.skipped = skipped; + this.compensating = compensating; + } + + protected int size() { + return 1 + skipped; + } + + protected SagaActions makeSagaActions(SagaActions.Builder builder, Data data, SagaExecutionState newState, boolean compensating) { + String state = encodeState(newState); + return builder.buildActions(data, compensating, state, newState.isEndState()); + } + + + +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ISagaStep.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ISagaStep.java new file mode 100644 index 0000000..9e4f10e --- /dev/null +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/ISagaStep.java @@ -0,0 +1,11 @@ +package io.eventuate.tram.sagas.simpledsl; + +import io.eventuate.tram.messaging.common.Message; + +public interface ISagaStep { + boolean isSuccessfulReply(boolean compensating, Message message); + + boolean hasAction(Data data); + + boolean hasCompensation(Data data); +} diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaExecutionState.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaExecutionState.java index c991d42..3aa8721 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaExecutionState.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaExecutionState.java @@ -7,6 +7,8 @@ public class SagaExecutionState { private int currentlyExecuting; private boolean compensating; private boolean endState; + private boolean failed; + @Override public String toString() { @@ -53,9 +55,25 @@ public void setEndState(boolean endState) { this.endState = endState; } + public void setFailed(boolean failed) { + this.failed = failed; + } + + public boolean isFailed() { + return failed; + } + public static SagaExecutionState makeEndState() { SagaExecutionState x = new SagaExecutionState(); x.setEndState(true); return x; } + + public static SagaExecutionState makeFailedEndState() { + SagaExecutionState x = new SagaExecutionState(); + x.setEndState(true); + x.setFailed(true); + return x; + } + } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaStep.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaStep.java index 0fdebbf..41c0a5e 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaStep.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SagaStep.java @@ -5,14 +5,10 @@ import java.util.Optional; import java.util.function.BiConsumer; -public interface SagaStep { - boolean isSuccessfulReply(boolean compensating, Message message); +public interface SagaStep extends ISagaStep { Optional> getReplyHandler(Message message, boolean compensating); StepOutcome makeStepOutcome(Data data, boolean compensating); - boolean hasAction(Data data); - - boolean hasCompensation(Data data); } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SimpleSagaDefinition.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SimpleSagaDefinition.java index 4e1bd0e..bf41f60 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SimpleSagaDefinition.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/SimpleSagaDefinition.java @@ -1,104 +1,69 @@ package io.eventuate.tram.sagas.simpledsl; -import io.eventuate.common.json.mapper.JSonMapper; -import io.eventuate.tram.commands.common.ReplyMessageHeaders; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.sagas.orchestration.SagaActions; import io.eventuate.tram.sagas.orchestration.SagaDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.Optional; -import java.util.function.BiConsumer; -public class SimpleSagaDefinition implements SagaDefinition { - private Logger logger = LoggerFactory.getLogger(this.getClass()); +public class SimpleSagaDefinition + extends AbstractSimpleSagaDefinition, StepToExecute> + implements SagaDefinition { - private List> sagaSteps; - public SimpleSagaDefinition(List> sagaSteps) { - this.sagaSteps = sagaSteps; + public SimpleSagaDefinition(List> steps) { + super(steps); } @Override public SagaActions start(Data sagaData) { SagaExecutionState currentState = new SagaExecutionState(-1, false); - StepToExecute stepToExecute = nextStepToExecute(currentState, sagaData); + Optional> stepToExecute = nextStepToExecute(currentState, sagaData); - if (stepToExecute.isEmpty()) { + if (!stepToExecute.isPresent()) { return makeEndStateSagaActions(currentState); } else - return stepToExecute.executeStep(sagaData, currentState); + return stepToExecute.get().executeStep(sagaData, currentState); } @Override - public SagaActions handleReply(String currentState, Data sagaData, Message message) { + public SagaActions handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) { SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState); - SagaStep currentStep = sagaSteps.get(state.getCurrentlyExecuting()); + SagaStep currentStep = steps.get(state.getCurrentlyExecuting()); boolean compensating = state.isCompensating(); - currentStep.getReplyHandler(message, compensating).ifPresent(handler -> { - invokeReplyHandler(message, sagaData, handler); - }); + currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> { + handler.accept(d, m); + return null; + })); if (currentStep.isSuccessfulReply(compensating, message)) { return executeNextStep(sagaData, state); } else if (compensating) { - throw new UnsupportedOperationException("Failure when compensating"); + return handleFailedCompensatingTransaction(sagaType, sagaId, state, message); } else { return executeNextStep(sagaData, state.startCompensating()); } } - - private StepToExecute nextStepToExecute(SagaExecutionState state, Data data) { - int skipped = 0; - boolean compensating = state.isCompensating(); - int direction = compensating ? -1 : +1; - for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < sagaSteps.size(); i = i + direction) { - SagaStep step = sagaSteps.get(i); - if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) { - return new StepToExecute<>(Optional.of(step), skipped, compensating); - } else - skipped++; - } - return new StepToExecute<>(Optional.empty(), skipped, compensating); - } - - private SagaActions executeNextStep(Data data, SagaExecutionState state) { - StepToExecute stepToExecute = nextStepToExecute(state, data); - if (stepToExecute.isEmpty()) { + protected SagaActions executeNextStep(Data data, SagaExecutionState state) { + Optional> stepToExecute = nextStepToExecute(state, data); + if (!stepToExecute.isPresent()) { return makeEndStateSagaActions(state); } else { // do something - return stepToExecute.executeStep(data, state); + return stepToExecute.get().executeStep(data, state); } } - private void invokeReplyHandler(Message message, Data data, BiConsumer handler) { - Class m; - try { - String className = message.getRequiredHeader(ReplyMessageHeaders.REPLY_TYPE); - m = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); - } catch (ClassNotFoundException e) { - logger.error("Class not found", e); - throw new RuntimeException("Class not found", e); - } - Object reply = JSonMapper.fromJson(message.getPayload(), m); - handler.accept(data, reply); - } - private SagaActions makeEndStateSagaActions(SagaExecutionState state) { - return SagaActions.builder() - .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState())) - .withIsEndState(true) - .withIsCompensating(state.isCompensating()) - .build(); + @Override + protected StepToExecute makeStepToExecute(int skipped, boolean compensating, SagaStep step) { + return new StepToExecute<>(step, skipped, compensating); } - } diff --git a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepToExecute.java b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepToExecute.java index bcea0e3..b3de3c3 100644 --- a/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepToExecute.java +++ b/eventuate-tram-sagas-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/simpledsl/StepToExecute.java @@ -2,44 +2,23 @@ import io.eventuate.tram.sagas.orchestration.SagaActions; -import java.util.Optional; +public class StepToExecute extends AbstractStepToExecute> { -import static io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde.encodeState; -public class StepToExecute { - private final Optional> step; - private final int skipped; - private final boolean compensating; - - - public StepToExecute(Optional> step, int skipped, boolean compensating) { - this.compensating = compensating; - this.step = step; - this.skipped = skipped; + public StepToExecute(SagaStep step, int skipped, boolean compensating) { + super(step, skipped, compensating); } - private int size() { - return step.map(x -> 1).orElse(0) + skipped; - } - - public boolean isEmpty() { - return !step.isPresent(); - } - public SagaActions executeStep(Data data, SagaExecutionState currentState) { SagaExecutionState newState = currentState.nextState(size()); SagaActions.Builder builder = SagaActions.builder(); boolean compensating = currentState.isCompensating(); - step.get().makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands); + step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands); - return builder - .withUpdatedSagaData(data) - .withUpdatedState(encodeState(newState)) - .withIsEndState(newState.isEndState()) - .withIsCompensating(compensating) - .build(); + return makeSagaActions(builder, data, newState, compensating); } + } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/JdbcSqlQueryRow.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/JdbcSqlQueryRow.java new file mode 100644 index 0000000..61ffb07 --- /dev/null +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/JdbcSqlQueryRow.java @@ -0,0 +1,30 @@ +package io.eventuate.tram.sagas.orchestration; + +import java.sql.ResultSet; +import java.sql.SQLException; + +class JdbcSqlQueryRow implements SqlQueryRow { + private final ResultSet rs; + + public JdbcSqlQueryRow(ResultSet rs) { + this.rs = rs; + } + + @Override + public String getString(String name) { + try { + return rs.getString(name); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean getBoolean(String name) { + try { + return rs.getBoolean(name); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/Saga.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/Saga.java index c24632d..e60713e 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/Saga.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/Saga.java @@ -12,5 +12,5 @@ default String getSagaType() { default void onStarting(String sagaId, Data data) { } default void onSagaCompletedSuccessfully(String sagaId, Data data) { } default void onSagaRolledBack(String sagaId, Data data) { } - + default void onSagaFailed(String sagaId, Data data) {}; } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java index ba40df2..a94f54d 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaActions.java @@ -5,7 +5,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; public class SagaActions { @@ -17,10 +16,11 @@ public class SagaActions { private boolean compensating; private boolean local; private Optional localException; + private boolean failed; public SagaActions(List commands, Optional updatedSagaData, - Optional updatedState, boolean endState, boolean compensating, boolean local, Optional localException) { + Optional updatedState, boolean endState, boolean compensating, boolean failed, boolean local, Optional localException) { this.commands = commands; this.updatedSagaData = updatedSagaData; this.updatedState = updatedState; @@ -28,6 +28,7 @@ public SagaActions(List commands, this.compensating = compensating; this.local = local; this.localException = localException; + this.failed = failed; } public List getCommands() { @@ -54,6 +55,10 @@ public boolean isLocal() { return local; } + public boolean isFailed() { + return failed; + } + public Optional getLocalException() { return localException; } @@ -65,13 +70,14 @@ public static class Builder { private boolean endState; private boolean compensating; private boolean local; + private boolean failed; private Optional localException = Optional.empty(); public Builder() { } public SagaActions build() { - return new SagaActions<>(commands, updatedSagaData, updatedState, endState, compensating, local, localException); + return new SagaActions<>(commands, updatedSagaData, updatedState, endState, compensating, failed, local, localException); } public Builder withCommand(CommandWithDestination command) { @@ -99,6 +105,11 @@ public Builder withIsEndState(boolean endState) { return this; } + public Builder withIsFailed(boolean failed) { + this.failed = failed; + return this; + } + public Builder withIsCompensating(boolean compensating) { this.compensating = compensating; return this; @@ -110,6 +121,15 @@ public Builder withIsLocal(Optional localException) { this.local = true; return this; } + + public SagaActions buildActions(Data data, boolean compensating, String state, boolean endState) { + return withUpdatedSagaData(data) + .withUpdatedState(state) + .withIsEndState(endState) + .withIsCompensating(compensating) + .build(); + } + } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaDefinition.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaDefinition.java index e158819..9e249a0 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaDefinition.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaDefinition.java @@ -7,6 +7,6 @@ public interface SagaDefinition { SagaActions start(Data sagaData); - SagaActions handleReply(String currentState, Data sagaData, Message message); + SagaActions handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message); } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java index dbf0dcd..916c2eb 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstance.java @@ -12,6 +12,7 @@ public class SagaInstance { private Set destinationsAndResources; private Boolean endState = false; private Boolean compensating = false; + private Boolean failed = false; public void setSagaType(String sagaType) { this.sagaType = sagaType; @@ -25,6 +26,14 @@ public void setStateName(String stateName) { this.stateName = stateName; } + public SagaInstance(String sagaType, String sagaId, String stateName, String lastRequestId, SerializedSagaData serializedSagaData, Set destinationsAndResources, + boolean endState, boolean compensating, boolean failed) { + this(sagaType, sagaId, stateName, lastRequestId, serializedSagaData, destinationsAndResources); + + this.endState = endState; + this.compensating = compensating; + this.failed = failed; + } public SagaInstance(String sagaType, String sagaId, String stateName, String lastRequestId, SerializedSagaData serializedSagaData, Set destinationsAndResources) { this.sagaType = sagaType; this.id = sagaId; @@ -86,4 +95,12 @@ public void setCompensating(Boolean compensating) { public Boolean isCompensating() { return compensating; } + + public void setFailed(boolean failed) { + this.failed = failed; + } + + public Boolean isFailed() { + return failed; + } } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbc.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbc.java index 806457d..a8aec12 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbc.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbc.java @@ -4,9 +4,9 @@ import io.eventuate.common.jdbc.EventuateDuplicateKeyException; import io.eventuate.common.jdbc.EventuateJdbcStatementExecutor; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.HashSet; import java.util.Set; @@ -32,15 +32,9 @@ public SagaInstanceRepositoryJdbc(EventuateJdbcStatementExecutor eventuateJdbcSt public void save(SagaInstance sagaInstance) { sagaInstance.setId(idGenerator.genId(null).asString()); logger.info("Saving {} {}", sagaInstance.getSagaType(), sagaInstance.getId()); + eventuateJdbcStatementExecutor.update(sagaInstanceRepositorySql.getInsertIntoSagaInstanceSql(), - sagaInstance.getSagaType(), - sagaInstance.getId(), - sagaInstance.getStateName(), - sagaInstance.getLastRequestId(), - sagaInstance.getSerializedSagaData().getSagaDataType(), - sagaInstance.getSerializedSagaData().getSagaDataJSON(), - sagaInstance.isEndState(), - sagaInstance.isCompensating()); + sagaInstanceRepositorySql.makeSaveArgs(sagaInstance)); saveDestinationsAndResources(sagaInstance); } @@ -77,10 +71,7 @@ public SagaInstance find(String sagaType, String sagaId) { return eventuateJdbcStatementExecutor.query( sagaInstanceRepositorySql.getSelectFromSagaInstanceSql(), - (rs, rownum) -> - new SagaInstance(sagaType, sagaId, rs.getString("state_name"), - rs.getString("last_request_id"), - new SerializedSagaData(rs.getString("saga_data_type"), rs.getString("saga_data_json")), destinationsAndResources), + (rs, rownum) -> sagaInstanceRepositorySql.mapToSagaInstance(sagaType, sagaId, destinationsAndResources, new JdbcSqlQueryRow(rs)), sagaType, sagaId).stream().findFirst().orElseThrow( () -> new RuntimeException(String.format("Cannot find saga instance %s %s", sagaType, sagaId))); } @@ -89,12 +80,7 @@ public SagaInstance find(String sagaType, String sagaId) { public void update(SagaInstance sagaInstance) { logger.info("Updating {} {}", sagaInstance.getSagaType(), sagaInstance.getId()); int count = eventuateJdbcStatementExecutor.update(sagaInstanceRepositorySql.getUpdateSagaInstanceSql(), - sagaInstance.getStateName(), - sagaInstance.getLastRequestId(), - sagaInstance.getSerializedSagaData().getSagaDataType(), - sagaInstance.getSerializedSagaData().getSagaDataJSON(), - sagaInstance.isEndState(), sagaInstance.isCompensating(), - sagaInstance.getSagaType(), sagaInstance.getId()); + sagaInstanceRepositorySql.makeUpdateArgs(sagaInstance)); if (count != 1) { throw new RuntimeException("Should be 1 : " + count); @@ -102,4 +88,5 @@ public void update(SagaInstance sagaInstance) { saveDestinationsAndResources(sagaInstance); } + } diff --git a/eventuate-tram-sagas-common/src/main/java/io/eventuate/tram/sagas/common/SagaInstanceRepositorySql.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositorySql.java similarity index 50% rename from eventuate-tram-sagas-common/src/main/java/io/eventuate/tram/sagas/common/SagaInstanceRepositorySql.java rename to eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositorySql.java index a8f7aef..489be78 100644 --- a/eventuate-tram-sagas-common/src/main/java/io/eventuate/tram/sagas/common/SagaInstanceRepositorySql.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositorySql.java @@ -1,7 +1,9 @@ -package io.eventuate.tram.sagas.common; +package io.eventuate.tram.sagas.orchestration; import io.eventuate.common.jdbc.EventuateSchema; +import java.util.Set; + public class SagaInstanceRepositorySql { private String insertIntoSagaInstanceSql; @@ -14,13 +16,13 @@ public SagaInstanceRepositorySql(EventuateSchema eventuateSchema) { String sagaInstanceTable = eventuateSchema.qualifyTable("saga_instance"); String sagaInstanceParticipantsTable = eventuateSchema.qualifyTable("saga_instance_participants"); - insertIntoSagaInstanceSql = String.format("INSERT INTO %s(saga_type, saga_id, state_name, last_request_id, saga_data_type, saga_data_json, end_state, compensating) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", sagaInstanceTable); + insertIntoSagaInstanceSql = String.format("INSERT INTO %s(saga_type, saga_id, state_name, last_request_id, saga_data_type, saga_data_json, end_state, compensating, failed) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", sagaInstanceTable); insertIntoSagaInstanceParticipantsSql = String.format("INSERT INTO %s(saga_type, saga_id, destination, resource) values(?,?,?,?)", sagaInstanceParticipantsTable); selectFromSagaInstanceSql = String.format("SELECT * FROM %s WHERE saga_type = ? AND saga_id = ?", sagaInstanceTable); selectFromSagaInstanceParticipantsSql = String.format("SELECT destination, resource FROM %s WHERE saga_type = ? AND saga_id = ?", sagaInstanceParticipantsTable); - updateSagaInstanceSql = String.format("UPDATE %s SET state_name = ?, last_request_id = ?, saga_data_type = ?, saga_data_json = ?, end_state = ?, compensating = ? where saga_type = ? AND saga_id = ?", sagaInstanceTable); + updateSagaInstanceSql = String.format("UPDATE %s SET state_name = ?, last_request_id = ?, saga_data_type = ?, saga_data_json = ?, end_state = ?, compensating = ?, failed = ? where saga_type = ? AND saga_id = ?", sagaInstanceTable); } public String getInsertIntoSagaInstanceSql() { @@ -42,4 +44,36 @@ public String getSelectFromSagaInstanceParticipantsSql() { public String getUpdateSagaInstanceSql() { return updateSagaInstanceSql; } + + public Object[] makeSaveArgs(SagaInstance sagaInstance) { + return new Object[]{sagaInstance.getSagaType(), + sagaInstance.getId(), + sagaInstance.getStateName(), + sagaInstance.getLastRequestId(), + sagaInstance.getSerializedSagaData().getSagaDataType(), + sagaInstance.getSerializedSagaData().getSagaDataJSON(), + sagaInstance.isEndState(), + sagaInstance.isCompensating(), + sagaInstance.isFailed()}; + } + + public Object[] makeUpdateArgs(SagaInstance sagaInstance) { + return new Object[]{sagaInstance.getStateName(), + sagaInstance.getLastRequestId(), + sagaInstance.getSerializedSagaData().getSagaDataType(), + sagaInstance.getSerializedSagaData().getSagaDataJSON(), + sagaInstance.isEndState(), sagaInstance.isCompensating(), sagaInstance.isFailed(), + sagaInstance.getSagaType(), sagaInstance.getId()}; + } + + public SagaInstance mapToSagaInstance(String sagaType, String sagaId, Set destinationsAndResources, SqlQueryRow rs) { + return new SagaInstance(sagaType, sagaId, rs.getString("state_name"), + rs.getString("last_request_id"), + new SerializedSagaData(rs.getString("saga_data_type"), rs.getString("saga_data_json")), + destinationsAndResources, + rs.getBoolean("end_state"), + rs.getBoolean("compensating"), + rs.getBoolean("failed") + ); + } } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java index 5b49058..e962992 100644 --- a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SagaManagerImpl.java @@ -102,13 +102,13 @@ public SagaInstance create(Data sagaData, Optional resource) { throw e; }); - processActions(sagaId, sagaInstance, sagaData, actions); + processActions(saga.getSagaType(), sagaId, sagaInstance, sagaData, actions); return sagaInstance; } - private void performEndStateActions(String sagaId, SagaInstance sagaInstance, boolean compensating, Data sagaData) { + private void performEndStateActions(String sagaId, SagaInstance sagaInstance, boolean compensating, boolean failed, Data sagaData) { for (DestinationAndResource dr : sagaInstance.getDestinationsAndResources()) { Map headers = new HashMap<>(); headers.put(SagaCommandHeaders.SAGA_ID, sagaId); @@ -116,6 +116,8 @@ private void performEndStateActions(String sagaId, SagaInstance sagaInstance, bo commandProducer.send(dr.getDestination(), dr.getResource(), new SagaUnlockCommand(), makeSagaReplyChannel(), headers); } + if (failed) + saga.onSagaFailed(sagaId, sagaData); if (compensating) saga.onSagaRolledBack(sagaId, sagaData); else @@ -182,23 +184,23 @@ private void handleReply(Message message) { logger.info("Current state={}", currentState); - SagaActions actions = getStateDefinition().handleReply(currentState, sagaData, message); + SagaActions actions = getStateDefinition().handleReply(sagaType, sagaId, currentState, sagaData, message); logger.info("Handled reply. Sending commands {}", actions.getCommands()); - processActions(sagaId, sagaInstance, sagaData, actions); + processActions(sagaType, sagaId, sagaInstance, sagaData, actions); } - private void processActions(String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions actions) { + private void processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, SagaActions actions) { while (true) { if (actions.getLocalException().isPresent()) { - actions = getStateDefinition().handleReply(actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder + actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.FAILURE.name()) .withHeader(ReplyMessageHeaders.REPLY_TYPE, Failure.class.getName()) @@ -216,7 +218,7 @@ private void processActions(String sagaId, SagaInstance sagaInstance, Data sagaD sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(actions.getUpdatedSagaData().orElse(sagaData))); if (actions.isEndState()) { - performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), sagaData); + performEndStateActions(sagaId, sagaInstance, actions.isCompensating(), actions.isFailed(), sagaData); } sagaInstanceRepository.update(sagaInstance); @@ -224,7 +226,7 @@ private void processActions(String sagaId, SagaInstance sagaInstance, Data sagaD if (!actions.isLocal()) break; - actions = getStateDefinition().handleReply(actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder + actions = getStateDefinition().handleReply(sagaType, sagaId, actions.getUpdatedState().get(), actions.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") .withHeader(ReplyMessageHeaders.REPLY_OUTCOME, CommandReplyOutcome.SUCCESS.name()) .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) @@ -238,6 +240,7 @@ private void updateState(SagaInstance sagaInstance, SagaActions actions) { sagaInstance.setStateName(stateName); sagaInstance.setEndState(actions.isEndState()); sagaInstance.setCompensating(actions.isCompensating()); + sagaInstance.setFailed(actions.isFailed()); }); } diff --git a/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SqlQueryRow.java b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SqlQueryRow.java new file mode 100644 index 0000000..cf34d4f --- /dev/null +++ b/eventuate-tram-sagas-orchestration/src/main/java/io/eventuate/tram/sagas/orchestration/SqlQueryRow.java @@ -0,0 +1,6 @@ +package io.eventuate.tram.sagas.orchestration; + +public interface SqlQueryRow { + String getString(String name); + boolean getBoolean(String name); +} diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcCustomSchemaTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcCustomSchemaTest.java index 019ab1d..9b16efa 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcCustomSchemaTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcCustomSchemaTest.java @@ -1,7 +1,6 @@ package io.eventuate.tram.sagas.orchestration; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; public class SagaInstanceRepositoryJdbcCustomSchemaTest extends SagaInstanceRepositoryJdbcSchemaTest { diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcDefaultSchemaTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcDefaultSchemaTest.java index db3f218..859a963 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcDefaultSchemaTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcDefaultSchemaTest.java @@ -1,7 +1,6 @@ package io.eventuate.tram.sagas.orchestration; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; public class SagaInstanceRepositoryJdbcDefaultSchemaTest extends SagaInstanceRepositoryJdbcSchemaTest { diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcEmptySchemaTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcEmptySchemaTest.java index 82a2fdd..e0d7ecb 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcEmptySchemaTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcEmptySchemaTest.java @@ -1,7 +1,6 @@ package io.eventuate.tram.sagas.orchestration; import io.eventuate.common.jdbc.EventuateSchema; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; public class SagaInstanceRepositoryJdbcEmptySchemaTest extends SagaInstanceRepositoryJdbcSchemaTest { diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcSchemaTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcSchemaTest.java index 0c89563..ccce625 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcSchemaTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaInstanceRepositoryJdbcSchemaTest.java @@ -1,6 +1,5 @@ package io.eventuate.tram.sagas.orchestration; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; import org.junit.Assert; import org.junit.Test; @@ -33,7 +32,7 @@ public void testUpdateSagaInstance() { protected abstract SagaInstanceRepositorySql getSagaInstanceRepositoryJdbcSql(); private String getExpectedInsertIntoSagaInstance() { - return String.format("INSERT INTO %ssaga_instance(saga_type, saga_id, state_name, last_request_id, saga_data_type, saga_data_json, end_state, compensating) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", getExpectedPrefix()); + return String.format("INSERT INTO %ssaga_instance(saga_type, saga_id, state_name, last_request_id, saga_data_type, saga_data_json, end_state, compensating, failed) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)", getExpectedPrefix()); } private String getExpectedInsertIntoSagaInstanceParticipants() { @@ -49,7 +48,7 @@ private String getExpectedSelectFromSagaInstanceParticipants() { } private String getExpectedUpdateSagaInstance() { - return String.format("UPDATE %ssaga_instance SET state_name = ?, last_request_id = ?, saga_data_type = ?, saga_data_json = ?, end_state = ?, compensating = ? where saga_type = ? AND saga_id = ?", getExpectedPrefix()); + return String.format("UPDATE %ssaga_instance SET state_name = ?, last_request_id = ?, saga_data_type = ?, saga_data_json = ?, end_state = ?, compensating = ?, failed = ? where saga_type = ? AND saga_id = ?", getExpectedPrefix()); } protected abstract String getExpectedPrefix(); diff --git a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java index d36ae4d..1c15605 100644 --- a/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java +++ b/eventuate-tram-sagas-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/SagaManagerImplTest.java @@ -10,8 +10,8 @@ import io.eventuate.tram.messaging.consumer.MessageConsumer; import io.eventuate.tram.messaging.consumer.MessageHandler; import io.eventuate.tram.messaging.producer.MessageBuilder; -import io.eventuate.tram.sagas.common.SagaReplyHeaders; import io.eventuate.tram.sagas.common.SagaLockManager; +import io.eventuate.tram.sagas.common.SagaReplyHeaders; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -186,7 +186,7 @@ private void handleReply(boolean compensating) { when(sagaInstanceRepository.find(sagaType, sagaId)) .thenReturn(sagaInstance); - when(sagaDefinition.handleReply(anyString(), any(TestSagaData.class), any(Message.class))) + when(sagaDefinition.handleReply(eq(sagaType), eq(sagaId), anyString(), any(TestSagaData.class), any(Message.class))) .thenReturn(makeSecondSagaActions(compensating)); when(sagaCommandProducer.sendCommands(anyString(), anyString(), anyList(), anyString())).thenReturn diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveSagaStep.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveSagaStep.java index 6050340..d5566ce 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveSagaStep.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveSagaStep.java @@ -1,20 +1,15 @@ package io.eventuate.tram.sagas.reactive.simpledsl; import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.simpledsl.ISagaStep; import io.eventuate.tram.sagas.simpledsl.StepOutcome; import org.reactivestreams.Publisher; import java.util.Optional; import java.util.function.BiFunction; -public interface ReactiveSagaStep { - boolean isSuccessfulReply(boolean compensating, Message message); - +public interface ReactiveSagaStep extends ISagaStep { Optional>> getReplyHandler(Message message, boolean compensating); Publisher makeStepOutcome(Data data, boolean compensating); - - boolean hasAction(Data data); - - boolean hasCompensation(Data data); } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepToExecute.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepToExecute.java index 9305bf0..f7ac46d 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepToExecute.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/ReactiveStepToExecute.java @@ -1,53 +1,33 @@ package io.eventuate.tram.sagas.reactive.simpledsl; import io.eventuate.tram.sagas.orchestration.SagaActions; +import io.eventuate.tram.sagas.simpledsl.AbstractStepToExecute; import io.eventuate.tram.sagas.simpledsl.SagaExecutionState; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import java.util.Optional; +public class ReactiveStepToExecute extends AbstractStepToExecute> { -import static io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde.encodeState; -public class ReactiveStepToExecute { - private final Optional> step; - private final int skipped; - private final boolean compensating; - - - public ReactiveStepToExecute(Optional> step, int skipped, boolean compensating) { - this.compensating = compensating; - this.step = step; - this.skipped = skipped; + public ReactiveStepToExecute(ReactiveSagaStep step, int skipped, boolean compensating) { + super(step, skipped, compensating); } - private int size() { - return step.map(x -> 1).orElse(0) + skipped; - } - - public boolean isEmpty() { - return !step.isPresent(); - } - public Publisher> executeStep(Data data, SagaExecutionState currentState) { SagaExecutionState newState = currentState.nextState(size()); SagaActions.Builder builder = SagaActions.builder(); boolean compensating = currentState.isCompensating(); return Mono - .from(step.get().makeStepOutcome(data, this.compensating)) - .map(step -> { - step.visit(builder::withIsLocal, builder::withCommands); - return step; + .from(step.makeStepOutcome(data, this.compensating)) + .map(outcome -> { + outcome.visit(builder::withIsLocal, builder::withCommands); + return outcome; }) .then(Mono.fromSupplier(() -> - builder - .withUpdatedSagaData(data) - .withUpdatedState(encodeState(newState)) - .withIsEndState(newState.isEndState()) - .withIsCompensating(compensating) - .build())); + makeSagaActions(builder, data, newState, compensating))); } + } diff --git a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/SimpleReactiveSagaDefinition.java b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/SimpleReactiveSagaDefinition.java index b9ca892..f20d313 100644 --- a/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/SimpleReactiveSagaDefinition.java +++ b/eventuate-tram-sagas-reactive-orchestration-simple-dsl/src/main/java/io/eventuate/tram/sagas/reactive/simpledsl/SimpleReactiveSagaDefinition.java @@ -1,59 +1,58 @@ package io.eventuate.tram.sagas.reactive.simpledsl; -import io.eventuate.common.json.mapper.JSonMapper; -import io.eventuate.tram.commands.common.ReplyMessageHeaders; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.sagas.orchestration.SagaActions; import io.eventuate.tram.sagas.reactive.orchestration.ReactiveSagaDefinition; +import io.eventuate.tram.sagas.simpledsl.AbstractSimpleSagaDefinition; import io.eventuate.tram.sagas.simpledsl.SagaExecutionState; import io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde; import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import java.util.List; import java.util.Optional; -import java.util.function.BiFunction; -public class SimpleReactiveSagaDefinition implements ReactiveSagaDefinition { - private Logger logger = LoggerFactory.getLogger(this.getClass()); +public class SimpleReactiveSagaDefinition + extends AbstractSimpleSagaDefinition, ReactiveStepToExecute> + implements ReactiveSagaDefinition { - private List> sagaSteps; public SimpleReactiveSagaDefinition(List> sagaSteps) { - this.sagaSteps = sagaSteps; + super(sagaSteps); } @Override public Publisher> start(Data sagaData) { SagaExecutionState currentState = new SagaExecutionState(-1, false); - ReactiveStepToExecute stepToExecute = nextStepToExecute(currentState, sagaData); + Optional> stepToExecute = nextStepToExecute(currentState, sagaData); - if (stepToExecute.isEmpty()) { + if (!stepToExecute.isPresent()) { return Mono.just(makeEndStateSagaActions(currentState)); } else { - return stepToExecute.executeStep(sagaData, currentState); + return stepToExecute.get().executeStep(sagaData, currentState); } } @Override - public Publisher> handleReply(String currentState, Data sagaData, Message message) { + public Publisher> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) { SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState); - ReactiveSagaStep currentStep = sagaSteps.get(state.getCurrentlyExecuting()); + ReactiveSagaStep currentStep = steps.get(state.getCurrentlyExecuting()); boolean compensating = state.isCompensating(); return currentStep .getReplyHandler(message, compensating) - .map(handler -> Mono.from(invokeReplyHandler(message, sagaData, handler))) + .map(handler -> { + Publisher source = invokeReplyHandler(message, sagaData, handler); + return Mono.from(source); + }) .orElse(Mono.empty()) .then(Mono.defer(() -> { if (currentStep.isSuccessfulReply(compensating, message)) { return Mono.from(executeNextStep(sagaData, state)); } else if (compensating) { - return Mono.error(new UnsupportedOperationException("Failure when compensating")); + return Mono.just(handleFailedCompensatingTransaction(sagaType, sagaId, state, message)); } else { return Mono.from(executeNextStep(sagaData, state.startCompensating())); } @@ -61,50 +60,19 @@ public Publisher> handleReply(String currentState, Data sagaDa } - - private ReactiveStepToExecute nextStepToExecute(SagaExecutionState state, Data data) { - int skipped = 0; - boolean compensating = state.isCompensating(); - int direction = compensating ? -1 : +1; - for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < sagaSteps.size(); i = i + direction) { - ReactiveSagaStep step = sagaSteps.get(i); - if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) { - return new ReactiveStepToExecute<>(Optional.of(step), skipped, compensating); - } else - skipped++; - } - return new ReactiveStepToExecute<>(Optional.empty(), skipped, compensating); + @Override + protected ReactiveStepToExecute makeStepToExecute(int skipped, boolean compensating, ReactiveSagaStep step) { + return new ReactiveStepToExecute<>(step, skipped, compensating) ; } private Publisher> executeNextStep(Data data, SagaExecutionState state) { - ReactiveStepToExecute stepToExecute = nextStepToExecute(state, data); - - if (stepToExecute.isEmpty()) { + Optional> stepToExecute = nextStepToExecute(state, data); + if (!stepToExecute.isPresent()) { return Mono.just(makeEndStateSagaActions(state)); } else { // do something - return stepToExecute.executeStep(data, state); + return stepToExecute.get().executeStep(data, state); } } - private Publisher invokeReplyHandler(Message message, Data data, BiFunction> handler) { - Class m; - try { - String className = message.getRequiredHeader(ReplyMessageHeaders.REPLY_TYPE); - m = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); - } catch (ClassNotFoundException e) { - logger.error("Class not found", e); - throw new RuntimeException("Class not found", e); - } - Object reply = JSonMapper.fromJson(message.getPayload(), m); - return handler.apply(data, reply); - } - - private SagaActions makeEndStateSagaActions(SagaExecutionState state) { - return SagaActions.builder() - .withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState())) - .withIsEndState(true) - .withIsCompensating(state.isCompensating()) - .build(); - } } diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaDefinition.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaDefinition.java index 97d3274..df2004c 100644 --- a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaDefinition.java +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaDefinition.java @@ -9,5 +9,5 @@ public interface ReactiveSagaDefinition { Publisher> start(Data sagaData); - Publisher> handleReply(String currentState, Data sagaData, Message message); + Publisher> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message); } diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaInstanceRepositoryJdbc.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaInstanceRepositoryJdbc.java index 1b3092a..208ba6b 100644 --- a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaInstanceRepositoryJdbc.java +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaInstanceRepositoryJdbc.java @@ -4,10 +4,9 @@ import io.eventuate.common.jdbc.EventuateDuplicateKeyException; import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.common.reactive.jdbc.EventuateReactiveJdbcStatementExecutor; -import io.eventuate.tram.sagas.common.SagaInstanceRepositorySql; import io.eventuate.tram.sagas.orchestration.DestinationAndResource; import io.eventuate.tram.sagas.orchestration.SagaInstance; -import io.eventuate.tram.sagas.orchestration.SerializedSagaData; +import io.eventuate.tram.sagas.orchestration.SagaInstanceRepositorySql; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -43,14 +42,7 @@ public Mono save(SagaInstance sagaInstance) { return eventuateJdbcStatementExecutor .update(sagaInstanceRepositorySql.getInsertIntoSagaInstanceSql(), - sagaInstance.getSagaType(), - sagaInstance.getId(), - sagaInstance.getStateName(), - sagaInstance.getLastRequestId(), - sagaInstance.getSerializedSagaData().getSagaDataType(), - sagaInstance.getSerializedSagaData().getSagaDataJSON(), - sagaInstance.isEndState(), - sagaInstance.isCompensating()) + sagaInstanceRepositorySql.makeSaveArgs(sagaInstance)) .then(Mono.defer(() -> saveDestinationsAndResources(sagaInstance))); } @@ -90,9 +82,7 @@ public Mono find(String sagaType, String sagaId) { return destinationAndResources.collectList().flatMap(dar -> eventuateJdbcStatementExecutor.queryForList(sagaInstanceRepositorySql.getSelectFromSagaInstanceSql(), (row, rowMetadata) -> - new SagaInstance(sagaType, sagaId, row.get("state_name").toString(), - row.get("last_request_id").toString(), - new SerializedSagaData(row.get("saga_data_type").toString(), row.get("saga_data_json").toString()), new HashSet<>(dar)), + sagaInstanceRepositorySql.mapToSagaInstance(sagaType, sagaId, new HashSet<>(dar), new ReactiveSqlQueryRow(row)), sagaType, sagaId).single()); } @@ -100,13 +90,7 @@ public Mono find(String sagaType, String sagaId) { @Override public Mono update(SagaInstance sagaInstance) { return eventuateJdbcStatementExecutor - .update(sagaInstanceRepositorySql.getUpdateSagaInstanceSql(), - sagaInstance.getStateName(), - sagaInstance.getLastRequestId(), - sagaInstance.getSerializedSagaData().getSagaDataType(), - sagaInstance.getSerializedSagaData().getSagaDataJSON(), - sagaInstance.isEndState(), sagaInstance.isCompensating(), - sagaInstance.getSagaType(), sagaInstance.getId()) + .update(sagaInstanceRepositorySql.getUpdateSagaInstanceSql(), sagaInstanceRepositorySql.makeUpdateArgs(sagaInstance)) .flatMap(count -> { if (count != 1) { return Mono.error(new RuntimeException("Should be 1 : " + count)); diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaManagerImpl.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaManagerImpl.java index 001b89c..eb99393 100644 --- a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaManagerImpl.java +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSagaManagerImpl.java @@ -1,10 +1,6 @@ package io.eventuate.tram.sagas.reactive.orchestration; -import io.eventuate.tram.commands.common.CommandMessageHeaders; -import io.eventuate.tram.commands.common.CommandReplyOutcome; -import io.eventuate.tram.commands.common.Failure; -import io.eventuate.tram.commands.common.ReplyMessageHeaders; -import io.eventuate.tram.commands.common.Success; +import io.eventuate.tram.commands.common.*; import io.eventuate.tram.consumer.common.reactive.ReactiveMessageConsumer; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.producer.MessageBuilder; @@ -25,12 +21,7 @@ import reactor.core.publisher.Mono; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static java.util.Collections.singleton; @@ -121,7 +112,7 @@ public Mono create(Data sagaData, Optional resource) { if (actions.getLocalException().isPresent()) return Mono.error(actions.getLocalException().get()); else return Mono.just(actions); }) - .flatMap(actions -> processActions(sagaInstance.getId(), sagaInstance, sagaData, Mono.just(actions))) + .flatMap(actions -> processActions(getSagaType(), sagaInstance.getId(), sagaInstance, sagaData, Mono.just(actions))) .then(Mono.fromSupplier(() -> sagaInstance)); } @@ -202,9 +193,9 @@ private Mono handleReply(Message message) { Data data = SagaDataSerde.deserializeSagaData(si.getSerializedSagaData()); - Mono> actions = Mono.from(getStateDefinition().handleReply(currentState, getSagaData(si), message)); + Mono> actions = Mono.from(getStateDefinition().handleReply(sagaType, sagaId, currentState, getSagaData(si), message)); - return processActions(sagaId, si, data, actions); + return processActions(sagaType, sagaId, si, data, actions); }) .then(); } @@ -213,12 +204,12 @@ private Data getSagaData(SagaInstance sagaInstance) { return SagaDataSerde.deserializeSagaData(sagaInstance.getSerializedSagaData()); } - private Mono> processActions(String sagaId, SagaInstance sagaInstance, Data sagaData, Mono> actions) { + private Mono> processActions(String sagaType, String sagaId, SagaInstance sagaInstance, Data sagaData, Mono> actions) { return actions.flatMap(acts -> { if (acts.getLocalException().isPresent()) { Mono> nextActions = Mono.from(getStateDefinition() .handleReply( - acts.getUpdatedState().get(), + sagaType, sagaId, acts.getUpdatedState().get(), acts.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") @@ -227,7 +218,7 @@ private Mono> processActions(String sagaId, SagaInstance sagaI .build() )); - return processActions(sagaId, sagaInstance, sagaData, nextActions); + return processActions(sagaType, sagaId, sagaInstance, sagaData, nextActions); } else { Mono> nextActions = sagaCommandProducer .sendCommands(this.getSagaType(), sagaId, acts.getCommands(), this.makeSagaReplyChannel()) @@ -247,7 +238,7 @@ private Mono> processActions(String sagaId, SagaInstance sagaI })) .flatMap(newActs -> Mono.from(getStateDefinition() - .handleReply(newActs.getUpdatedState().get(), + .handleReply(sagaType, sagaId, newActs.getUpdatedState().get(), newActs.getUpdatedSagaData().get(), MessageBuilder .withPayload("{}") @@ -255,7 +246,7 @@ private Mono> processActions(String sagaId, SagaInstance sagaI .withHeader(ReplyMessageHeaders.REPLY_TYPE, Success.class.getName()) .build()))); - return nextActions.flatMap(na -> processActions(sagaId, sagaInstance, sagaData, Mono.just(na))); + return nextActions.flatMap(na -> processActions(sagaType, sagaId, sagaInstance, sagaData, Mono.just(na))); } }); } diff --git a/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSqlQueryRow.java b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSqlQueryRow.java new file mode 100644 index 0000000..ce84a3b --- /dev/null +++ b/eventuate-tram-sagas-reactive-orchestration/src/main/java/io/eventuate/tram/sagas/reactive/orchestration/ReactiveSqlQueryRow.java @@ -0,0 +1,25 @@ +package io.eventuate.tram.sagas.reactive.orchestration; + +import io.eventuate.tram.sagas.orchestration.SqlQueryRow; +import io.r2dbc.spi.Row; + +public class ReactiveSqlQueryRow implements SqlQueryRow { + private Row row; + + public ReactiveSqlQueryRow(Row row) { + this.row = row; + } + + @Override + public String getString(String name) { + return row.get(name, String.class); + } + + @Override + public boolean getBoolean(String name) { + Integer o = row.get(name, Integer.class); + return o != null && o > 0; + } + + +} diff --git a/eventuate-tram-sagas-spring-reactive-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/ReactiveSagaInstanceRepositoryJdbcIntegrationTest.java b/eventuate-tram-sagas-spring-reactive-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/ReactiveSagaInstanceRepositoryJdbcIntegrationTest.java index 0547f4e..9d59d09 100644 --- a/eventuate-tram-sagas-spring-reactive-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/ReactiveSagaInstanceRepositoryJdbcIntegrationTest.java +++ b/eventuate-tram-sagas-spring-reactive-orchestration/src/test/java/io/eventuate/tram/sagas/orchestration/ReactiveSagaInstanceRepositoryJdbcIntegrationTest.java @@ -22,19 +22,19 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; @RunWith(SpringRunner.class) @SpringBootTest(classes = ReactiveSagaInstanceRepositoryJdbcIntegrationTest.Config.class, webEnvironment = SpringBootTest.WebEnvironment.NONE) @EnableAutoConfiguration public class ReactiveSagaInstanceRepositoryJdbcIntegrationTest { - private String sagaType = generateId(); - private String sagaState = generateId(); - private String lastRequestId = generateId(); - private String sagaDataType = generateId(); - private String destination = generateId(); - private String resource = generateId(); + private final String sagaType = generateId(); + private final String sagaState = generateId(); + private final String lastRequestId = generateId(); + private final String sagaDataType = generateId(); + private final String destination = generateId(); + private final String resource = generateId(); @Configuration @Import(EventuateCommonReactiveDatabaseConfiguration.class) @@ -71,6 +71,10 @@ private void assertSavedSagaInstance() { DestinationAndResource destinationAndResource = destinationAndResources.stream().findAny().get(); assertEquals(destination, destinationAndResource.getDestination()); assertEquals(resource, destinationAndResource.getResource()); + + assertFalse(sagaInstance.isEndState()); + assertFalse(sagaInstance.isFailed()); + assertFalse(sagaInstance.isCompensating()); } private void updateSagaInstance() { @@ -78,6 +82,7 @@ private void updateSagaInstance() { sagaInstance.setLastRequestId("UpdateLastId"); sagaInstance.setSerializedSagaData(new SerializedSagaData("UpdatedSagaType", "{\"value\" : \"updatedValue\"}")); sagaInstance.getDestinationsAndResources().add(new DestinationAndResource("newDestination", "newResource")); + sagaInstance.setEndState(true); sagaInstanceRepository.update(sagaInstance).block(); sagaInstance = sagaInstanceRepository.find(sagaType, sagaInstance.getId()).block(); } @@ -87,6 +92,9 @@ private void assertUpdatedSagaInstance() { assertEquals("UpdateLastId", sagaInstance.getLastRequestId()); assertEquals("UpdatedSagaType", sagaInstance.getSerializedSagaData().getSagaDataType()); assertEquals("{\"value\" : \"updatedValue\"}", sagaInstance.getSerializedSagaData().getSagaDataJSON()); + assertTrue(sagaInstance.isEndState()); + assertFalse(sagaInstance.isFailed()); + assertFalse(sagaInstance.isCompensating()); Set destinationAndResources = sagaInstance.getDestinationsAndResources(); assertEquals(new HashSet<>(asList(new DestinationAndResource(destination, resource), new DestinationAndResource("newDestination", "newResource"))), destinationAndResources); @@ -103,7 +111,8 @@ private SagaInstance createSagaInstance() { sagaState, lastRequestId, new SerializedSagaData(sagaDataType, "{}"), - singleton(new DestinationAndResource(destination, resource))); + singleton(new DestinationAndResource(destination, resource)), + false, false, false); } private String generateId() { diff --git a/mssql/5.tram-saga-schema.sql b/mssql/5.tram-saga-schema.sql index 456e677..d12710f 100644 --- a/mssql/5.tram-saga-schema.sql +++ b/mssql/5.tram-saga-schema.sql @@ -27,6 +27,7 @@ CREATE TABLE eventuate.saga_instance( last_request_id VARCHAR(100), end_state TINYINT, compensating TINYINT, + failed TINYINT, saga_data_type VARCHAR(1000) NOT NULL, saga_data_json VARCHAR(1000) NOT NULL, PRIMARY KEY(saga_type, saga_id) diff --git a/mysql/tram-saga-schema.sql b/mysql/tram-saga-schema.sql index 25949b8..530df49 100644 --- a/mysql/tram-saga-schema.sql +++ b/mysql/tram-saga-schema.sql @@ -21,6 +21,7 @@ CREATE TABLE saga_instance( last_request_id VARCHAR(100), end_state INT(1), compensating INT(1), + failed INT(1), saga_data_type VARCHAR(1000) NOT NULL, saga_data_json VARCHAR(1000) NOT NULL, PRIMARY KEY(saga_type, saga_id) diff --git a/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/OrderConfiguration.java b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/OrderConfiguration.java index 54c5b8f..76d036e 100644 --- a/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/OrderConfiguration.java +++ b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/OrderConfiguration.java @@ -5,21 +5,17 @@ import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSaga; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSagaData; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.LocalCreateOrderSaga; -import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.LocalCreateOrderSagaData; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderCommandHandler; import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderService; import io.eventuate.tram.commands.consumer.CommandDispatcher; -import io.eventuate.tram.commands.producer.CommandProducer; -import io.eventuate.tram.spring.commands.consumer.TramCommandConsumerConfiguration; import io.eventuate.tram.events.publisher.DomainEventPublisher; -import io.eventuate.tram.messaging.consumer.MessageConsumer; -import io.eventuate.tram.sagas.common.SagaLockManager; -import io.eventuate.tram.sagas.orchestration.*; -import io.eventuate.tram.sagas.spring.orchestration.SagaOrchestratorConfiguration; +import io.eventuate.tram.sagas.orchestration.SagaInstanceFactory; import io.eventuate.tram.sagas.participant.SagaCommandDispatcherFactory; import io.eventuate.tram.spring.optimisticlocking.OptimisticLockingDecoratorConfiguration; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -46,7 +42,20 @@ public OrderService orderService(OrderDao orderDao, @Bean public CreateOrderSaga createOrderSaga(DomainEventPublisher domainEventPublisher) { - return new CreateOrderSaga(domainEventPublisher); + return new CreateOrderSaga(domainEventPublisher) { + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + + @Override + public void onStarting(String sagaId, CreateOrderSagaData createOrderSagaData) { + applicationEventPublisher.publishEvent(new SagaStartedEvent(this, sagaId)); + } + + @Override + public void onSagaFailed(String sagaId, CreateOrderSagaData createOrderSagaData) { + applicationEventPublisher.publishEvent(new SagaFailedEvent(this, sagaId)); + } + }; } @Bean diff --git a/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaFailedEvent.java b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaFailedEvent.java new file mode 100644 index 0000000..82cb19d --- /dev/null +++ b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaFailedEvent.java @@ -0,0 +1,10 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders; + +public class SagaFailedEvent extends SagaLifecycleEvent { + + public SagaFailedEvent(Object source, String sagaId) { + super(source, sagaId); + } + + +} diff --git a/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaLifecycleEvent.java b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaLifecycleEvent.java new file mode 100644 index 0000000..cf854fc --- /dev/null +++ b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaLifecycleEvent.java @@ -0,0 +1,16 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders; + +import org.springframework.context.ApplicationEvent; + +public class SagaLifecycleEvent extends ApplicationEvent { + protected String sagaId; + + public SagaLifecycleEvent(Object source, String sagaId) { + super(source); + this.sagaId = sagaId; + } + + public String getSagaId() { + return sagaId; + } +} diff --git a/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaStartedEvent.java b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaStartedEvent.java new file mode 100644 index 0000000..42c376a --- /dev/null +++ b/orders-and-customers-spring/src/main/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/orders/SagaStartedEvent.java @@ -0,0 +1,10 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders; + +public class SagaStartedEvent extends SagaLifecycleEvent { + + public SagaStartedEvent(Object source, String sagaId) { + super(source, sagaId); + } + + +} diff --git a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/AbstractOrdersAndCustomersIntegrationTest.java b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/AbstractOrdersAndCustomersIntegrationTest.java index ff2dec4..d618356 100644 --- a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/AbstractOrdersAndCustomersIntegrationTest.java +++ b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/AbstractOrdersAndCustomersIntegrationTest.java @@ -66,12 +66,16 @@ public void shouldRejectOrder() { Customer customer = customerService.createCustomer("Fred", creditLimit); Order order = createOrder(customer); - assertOrderState(order.getId(), OrderState.REJECTED); + assertOrderRejected(order); assertCreateOrderSagaRolledBack(order); } + protected void assertOrderRejected(Order order) { + assertOrderState(order.getId(), OrderState.REJECTED); + } + protected void assertCreateOrderSagaRolledBack(Order order) { Eventually.eventually(() -> { sagaEventsConsumer.assertEventReceived(CreateOrderSagaRolledBack.class, event -> { diff --git a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrderCommandHandlerWithFailingCompensatingTransaction.java b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrderCommandHandlerWithFailingCompensatingTransaction.java new file mode 100644 index 0000000..a5ed37b --- /dev/null +++ b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrderCommandHandlerWithFailingCompensatingTransaction.java @@ -0,0 +1,31 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.integrationtests; + +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.domain.OrderDao; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.participants.ApproveOrderCommand; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderCommandHandler; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.RejectOrderCommand; +import io.eventuate.tram.commands.consumer.CommandHandlers; +import io.eventuate.tram.commands.consumer.CommandMessage; +import io.eventuate.tram.messaging.common.Message; +import io.eventuate.tram.sagas.participant.SagaCommandHandlersBuilder; + +import static io.eventuate.tram.commands.consumer.CommandHandlerReplyBuilder.withFailure; + +class OrderCommandHandlerWithFailingCompensatingTransaction extends OrderCommandHandler { + public OrderCommandHandlerWithFailingCompensatingTransaction(OrderDao orderDao) { + super(orderDao); + } + + @Override + public CommandHandlers commandHandlerDefinitions() { + return SagaCommandHandlersBuilder + .fromChannel("orderService") + .onMessage(ApproveOrderCommand.class, this::approve) + .onMessage(RejectOrderCommand.class, this::reject) + .build(); + } + + public Message reject(CommandMessage cm) { + return withFailure(); + } +} diff --git a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrdersAndCustomersInMemoryFailingCompensatingTransactionIntegrationTest.java b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrdersAndCustomersInMemoryFailingCompensatingTransactionIntegrationTest.java new file mode 100644 index 0000000..7fe6d1a --- /dev/null +++ b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/OrdersAndCustomersInMemoryFailingCompensatingTransactionIntegrationTest.java @@ -0,0 +1,97 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.integrationtests; + +import io.eventuate.examples.tram.sagas.ordersandcustomers.commondomain.Money; +import io.eventuate.examples.tram.sagas.ordersandcustomers.customers.domain.Customer; +import io.eventuate.examples.tram.sagas.ordersandcustomers.customers.service.CustomerService; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.domain.OrderDao; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.sagas.createorder.CreateOrderSaga; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderCommandHandler; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderDetails; +import io.eventuate.examples.tram.sagas.ordersandcustomers.orders.service.OrderService; +import io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders.SagaFailedEvent; +import io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders.SagaStartedEvent; +import io.eventuate.tram.sagas.orchestration.SagaInstance; +import io.eventuate.tram.sagas.orchestration.SagaInstanceRepository; +import io.eventuate.util.test.async.Eventually; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = OrdersAndCustomersInMemoryFailingCompensatingTransactionIntegrationTest.Config.class) +public class OrdersAndCustomersInMemoryFailingCompensatingTransactionIntegrationTest { + + @Configuration + @Import(OrdersAndCustomersInMemoryIntegrationTestConfiguration.class) + public static class Config { + + @Bean + public OrderCommandHandler orderCommandHandler(OrderDao orderDao) { + return new OrderCommandHandlerWithFailingCompensatingTransaction(orderDao); + } + + @Bean + public SagaLifecycleEventListener sagaStartedEventListener() { + return new SagaLifecycleEventListener(); + } + + } + + @Autowired + protected CustomerService customerService; + + @Autowired + protected OrderService orderService; + + @Autowired + private SagaLifecycleEventListener sagaEventListener; + + @Autowired + private SagaInstanceRepository sagaInstanceRepository; + + @Autowired + private CreateOrderSaga createOrderSaga; + + @Test + public void shouldChangeStateOfSagaToFailedWhenCompensatingTransactionFails() { + sagaEventListener.clear(); + + createOrderThatExceedsCreateLimitAndStartSaga(); + + String sagaId = getSagaId(); + + assertSagaFailed(sagaId); + + } + + private void createOrderThatExceedsCreateLimitAndStartSaga() { + Customer customer = customerService.createCustomer("Fred", new Money("15.00")); + orderService.createOrder(new OrderDetails(customer.getId(), new Money("123.40"))); + } + + private String getSagaId() { + SagaStartedEvent startedEvent = sagaEventListener.expectEvent(SagaStartedEvent.class); + assertNotNull(startedEvent); + return startedEvent.getSagaId(); + } + + private void assertSagaFailed(String sagaId) { + String sagaType = createOrderSaga.getSagaType(); + Eventually.eventually(() -> { + SagaInstance s = sagaInstanceRepository.find(sagaType, sagaId); + assertTrue(s.isFailed()); + }); + + sagaEventListener.expectEvent(SagaFailedEvent.class); + + } + +} diff --git a/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/SagaLifecycleEventListener.java b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/SagaLifecycleEventListener.java new file mode 100644 index 0000000..7c5c792 --- /dev/null +++ b/orders-and-customers-spring/src/test/java/io/eventuate/examples/tram/sagas/ordersandcustomers/spring/integrationtests/SagaLifecycleEventListener.java @@ -0,0 +1,30 @@ +package io.eventuate.examples.tram.sagas.ordersandcustomers.spring.integrationtests; + +import io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders.SagaLifecycleEvent; +import io.eventuate.examples.tram.sagas.ordersandcustomers.spring.orders.SagaStartedEvent; +import org.springframework.context.event.EventListener; + +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +public class SagaLifecycleEventListener { + + private LinkedBlockingDeque events = new LinkedBlockingDeque<>(); + + @EventListener(classes = {SagaStartedEvent.class, SagaStartedEvent.class}) + public void handleSagaStartedEvent(SagaLifecycleEvent event) { + events.add(event); + } + + public void clear() { + events.clear(); + } + + public T expectEvent(Class eventClass) { + try { + return (T) events.poll(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/postgres/tram-saga-schema.sql b/postgres/tram-saga-schema.sql index 69350f8..dea0cbb 100644 --- a/postgres/tram-saga-schema.sql +++ b/postgres/tram-saga-schema.sql @@ -18,6 +18,7 @@ CREATE TABLE eventuate.saga_instance( last_request_id VARCHAR(100), end_state BOOLEAN, compensating BOOLEAN, + failed BOOLEAN, saga_data_type VARCHAR(1000) NOT NULL, saga_data_json VARCHAR(1000) NOT NULL, PRIMARY KEY(saga_type, saga_id)