Skip to content

Commit

Permalink
#75 Handle scenario where compensating transaction returns a failure,…
Browse files Browse the repository at this point in the history
… Extracted common functionality out of non-reactive and reactive SagaDefinition, #77 Refactor to reduce copy/paste in SQL code
  • Loading branch information
cer committed Jun 1, 2022
1 parent 17a1a95 commit c4308af
Show file tree
Hide file tree
Showing 41 changed files with 605 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS eventuate.saga_instance(
last_request_id VARCHAR(100),
end_state INT(1),
compensating INT(1),
failed INT(1),
saga_data_type VARCHAR(1000) NOT NULL,
saga_data_json VARCHAR(1000) NOT NULL,
PRIMARY KEY(saga_type, saga_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.SagaActions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;

public abstract class AbstractSimpleSagaDefinition<Data, Step extends ISagaStep<Data>,
ToExecute extends AbstractStepToExecute<Data, Step>> {

protected Logger logger = LoggerFactory.getLogger(this.getClass());

protected List<Step> steps;

public AbstractSimpleSagaDefinition(List<Step> steps) {
this.steps = steps;
}

protected SagaActions<Data> handleFailedCompensatingTransaction(String sagaType, String sagaId, SagaExecutionState state, Message message) {
logger.error("Saga {} {} failed due to failed compensating transaction {}", sagaType, sagaId, message);
return SagaActions.<Data>builder()
.withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeFailedEndState()))
.withIsEndState(true)
.withIsCompensating(state.isCompensating())
.withIsFailed(true)
.build();
}

protected Optional<ToExecute> nextStepToExecute(SagaExecutionState state, Data data) {
int skipped = 0;
boolean compensating = state.isCompensating();
int direction = compensating ? -1 : +1;
for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < steps.size(); i = i + direction) {
Step step = steps.get(i);
if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) {
return Optional.of(makeStepToExecute(skipped, compensating, step));
} else
skipped++;
}
return Optional.empty();
}

protected <T> T invokeReplyHandler(Message message, Data data, BiFunction<Data, Object, T> handler) {
Class<?> m;
try {
String className = message.getRequiredHeader(ReplyMessageHeaders.REPLY_TYPE);
m = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
logger.error("Class not found", e);
throw new RuntimeException("Class not found", e);
}
Object reply = JSonMapper.fromJson(message.getPayload(), m);
return handler.apply(data, reply);
}

protected SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) {
return SagaActions.<Data>builder()
.withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState()))
.withIsEndState(true)
.withIsCompensating(state.isCompensating())
.build();
}

protected abstract ToExecute makeStepToExecute(int skipped, boolean compensating, Step step);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.sagas.orchestration.SagaActions;

import static io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde.encodeState;

public class AbstractStepToExecute<Data, SagaStep extends ISagaStep<Data>> {
protected final SagaStep step;
protected final int skipped;
protected final boolean compensating;

public AbstractStepToExecute(SagaStep step, int skipped, boolean compensating) {
this.step = step;
this.skipped = skipped;
this.compensating = compensating;
}

protected int size() {
return 1 + skipped;
}

protected SagaActions<Data> makeSagaActions(SagaActions.Builder<Data> builder, Data data, SagaExecutionState newState, boolean compensating) {
String state = encodeState(newState);
return builder.buildActions(data, compensating, state, newState.isEndState());
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.tram.messaging.common.Message;

public interface ISagaStep<Data> {
boolean isSuccessfulReply(boolean compensating, Message message);

boolean hasAction(Data data);

boolean hasCompensation(Data data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class SagaExecutionState {
private int currentlyExecuting;
private boolean compensating;
private boolean endState;
private boolean failed;


@Override
public String toString() {
Expand Down Expand Up @@ -53,9 +55,25 @@ public void setEndState(boolean endState) {
this.endState = endState;
}

public void setFailed(boolean failed) {
this.failed = failed;
}

public boolean isFailed() {
return failed;
}

public static SagaExecutionState makeEndState() {
SagaExecutionState x = new SagaExecutionState();
x.setEndState(true);
return x;
}

public static SagaExecutionState makeFailedEndState() {
SagaExecutionState x = new SagaExecutionState();
x.setEndState(true);
x.setFailed(true);
return x;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,10 @@
import java.util.Optional;
import java.util.function.BiConsumer;

public interface SagaStep<Data> {
boolean isSuccessfulReply(boolean compensating, Message message);
public interface SagaStep<Data> extends ISagaStep<Data> {

Optional<BiConsumer<Data, Object>> getReplyHandler(Message message, boolean compensating);

StepOutcome makeStepOutcome(Data data, boolean compensating);

boolean hasAction(Data data);

boolean hasCompensation(Data data);
}
Original file line number Diff line number Diff line change
@@ -1,104 +1,69 @@
package io.eventuate.tram.sagas.simpledsl;

import io.eventuate.common.json.mapper.JSonMapper;
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.sagas.orchestration.SagaActions;
import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

public class SimpleSagaDefinition<Data> implements SagaDefinition<Data> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public class SimpleSagaDefinition<Data>
extends AbstractSimpleSagaDefinition<Data, SagaStep<Data>, StepToExecute<Data>>
implements SagaDefinition<Data> {

private List<SagaStep<Data>> sagaSteps;

public SimpleSagaDefinition(List<SagaStep<Data>> sagaSteps) {
this.sagaSteps = sagaSteps;
public SimpleSagaDefinition(List<SagaStep<Data>> steps) {
super(steps);
}

@Override
public SagaActions<Data> start(Data sagaData) {
SagaExecutionState currentState = new SagaExecutionState(-1, false);

StepToExecute<Data> stepToExecute = nextStepToExecute(currentState, sagaData);
Optional<StepToExecute<Data>> stepToExecute = nextStepToExecute(currentState, sagaData);

if (stepToExecute.isEmpty()) {
if (!stepToExecute.isPresent()) {
return makeEndStateSagaActions(currentState);
} else
return stepToExecute.executeStep(sagaData, currentState);
return stepToExecute.get().executeStep(sagaData, currentState);
}

@Override
public SagaActions<Data> handleReply(String currentState, Data sagaData, Message message) {
public SagaActions<Data> handleReply(String sagaType, String sagaId, String currentState, Data sagaData, Message message) {

SagaExecutionState state = SagaExecutionStateJsonSerde.decodeState(currentState);
SagaStep<Data> currentStep = sagaSteps.get(state.getCurrentlyExecuting());
SagaStep<Data> currentStep = steps.get(state.getCurrentlyExecuting());
boolean compensating = state.isCompensating();

currentStep.getReplyHandler(message, compensating).ifPresent(handler -> {
invokeReplyHandler(message, sagaData, handler);
});
currentStep.getReplyHandler(message, compensating).ifPresent(handler -> invokeReplyHandler(message, sagaData, (d, m) -> {
handler.accept(d, m);
return null;
}));

if (currentStep.isSuccessfulReply(compensating, message)) {
return executeNextStep(sagaData, state);
} else if (compensating) {
throw new UnsupportedOperationException("Failure when compensating");
return handleFailedCompensatingTransaction(sagaType, sagaId, state, message);
} else {
return executeNextStep(sagaData, state.startCompensating());
}
}



private StepToExecute<Data> nextStepToExecute(SagaExecutionState state, Data data) {
int skipped = 0;
boolean compensating = state.isCompensating();
int direction = compensating ? -1 : +1;
for (int i = state.getCurrentlyExecuting() + direction; i >= 0 && i < sagaSteps.size(); i = i + direction) {
SagaStep<Data> step = sagaSteps.get(i);
if ((compensating ? step.hasCompensation(data) : step.hasAction(data))) {
return new StepToExecute<>(Optional.of(step), skipped, compensating);
} else
skipped++;
}
return new StepToExecute<>(Optional.empty(), skipped, compensating);
}

private SagaActions<Data> executeNextStep(Data data, SagaExecutionState state) {
StepToExecute<Data> stepToExecute = nextStepToExecute(state, data);
if (stepToExecute.isEmpty()) {
protected SagaActions<Data> executeNextStep(Data data, SagaExecutionState state) {
Optional<StepToExecute<Data>> stepToExecute = nextStepToExecute(state, data);
if (!stepToExecute.isPresent()) {
return makeEndStateSagaActions(state);
} else {
// do something
return stepToExecute.executeStep(data, state);
return stepToExecute.get().executeStep(data, state);
}
}

private void invokeReplyHandler(Message message, Data data, BiConsumer<Data, Object> handler) {
Class m;
try {
String className = message.getRequiredHeader(ReplyMessageHeaders.REPLY_TYPE);
m = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
logger.error("Class not found", e);
throw new RuntimeException("Class not found", e);
}
Object reply = JSonMapper.fromJson(message.getPayload(), m);
handler.accept(data, reply);
}

private SagaActions<Data> makeEndStateSagaActions(SagaExecutionState state) {
return SagaActions.<Data>builder()
.withUpdatedState(SagaExecutionStateJsonSerde.encodeState(SagaExecutionState.makeEndState()))
.withIsEndState(true)
.withIsCompensating(state.isCompensating())
.build();
@Override
protected StepToExecute<Data> makeStepToExecute(int skipped, boolean compensating, SagaStep<Data> step) {
return new StepToExecute<>(step, skipped, compensating);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,23 @@

import io.eventuate.tram.sagas.orchestration.SagaActions;

import java.util.Optional;
public class StepToExecute<Data> extends AbstractStepToExecute<Data, SagaStep<Data>> {

import static io.eventuate.tram.sagas.simpledsl.SagaExecutionStateJsonSerde.encodeState;

public class StepToExecute<Data> {
private final Optional<SagaStep<Data>> step;
private final int skipped;
private final boolean compensating;


public StepToExecute(Optional<SagaStep<Data>> step, int skipped, boolean compensating) {
this.compensating = compensating;
this.step = step;
this.skipped = skipped;
public StepToExecute(SagaStep<Data> step, int skipped, boolean compensating) {
super(step, skipped, compensating);
}


private int size() {
return step.map(x -> 1).orElse(0) + skipped;
}

public boolean isEmpty() {
return !step.isPresent();
}

public SagaActions<Data> executeStep(Data data, SagaExecutionState currentState) {
SagaExecutionState newState = currentState.nextState(size());
SagaActions.Builder<Data> builder = SagaActions.builder();
boolean compensating = currentState.isCompensating();

step.get().makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands);
step.makeStepOutcome(data, this.compensating).visit(builder::withIsLocal, builder::withCommands);

return builder
.withUpdatedSagaData(data)
.withUpdatedState(encodeState(newState))
.withIsEndState(newState.isEndState())
.withIsCompensating(compensating)
.build();
return makeSagaActions(builder, data, newState, compensating);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.eventuate.tram.sagas.orchestration;

import java.sql.ResultSet;
import java.sql.SQLException;

class JdbcSqlQueryRow implements SqlQueryRow {
private final ResultSet rs;

public JdbcSqlQueryRow(ResultSet rs) {
this.rs = rs;
}

@Override
public String getString(String name) {
try {
return rs.getString(name);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean getBoolean(String name) {
try {
return rs.getBoolean(name);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ default String getSagaType() {
default void onStarting(String sagaId, Data data) { }
default void onSagaCompletedSuccessfully(String sagaId, Data data) { }
default void onSagaRolledBack(String sagaId, Data data) { }

default void onSagaFailed(String sagaId, Data data) {};
}
Loading

0 comments on commit c4308af

Please sign in to comment.