Skip to content

Commit

Permalink
#86 Improve exception handling for local saga steps
Browse files Browse the repository at this point in the history
  • Loading branch information
cer committed Sep 7, 2022
1 parent b11bb74 commit e47f49c
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.eventuate.tram.sagas.simpledsl;

import java.util.function.BiConsumer;

public class LocalExceptionSaver<Data> {
private final Class<?> exceptionType;
private final BiConsumer<Data, RuntimeException> exceptionConsumer;

public LocalExceptionSaver(Class<?> exceptionType, BiConsumer<Data,RuntimeException> exceptionConsumer) {
this.exceptionType = exceptionType;
this.exceptionConsumer = exceptionConsumer;
}

public boolean shouldSave(Exception e) {
return exceptionType.isInstance(e);
}

public void save(Data data, RuntimeException e) {
exceptionConsumer.accept(data, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,24 @@
import io.eventuate.tram.commands.common.ReplyMessageHeaders;
import io.eventuate.tram.messaging.common.Message;

import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static io.eventuate.tram.sagas.simpledsl.StepOutcome.makeLocalOutcome;

public class LocalStep<Data> implements SagaStep<Data> {
private Consumer<Data> localFunction;
private Optional<Consumer<Data>> compensation;
private final Consumer<Data> localFunction;
private final Optional<Consumer<Data>> compensation;
private final List<LocalExceptionSaver<Data>> localExceptionSavers;
private final List<Class<RuntimeException>> rollbackExceptions;

public LocalStep(Consumer<Data> localFunction, Optional<Consumer<Data>> compensation) {
public LocalStep(Consumer<Data> localFunction, Optional<Consumer<Data>> compensation, List<LocalExceptionSaver<Data>> localExceptionSavers, List<Class<RuntimeException>> rollbackExceptions) {
this.localFunction = localFunction;
this.compensation = compensation;
this.localExceptionSavers = localExceptionSavers;
this.rollbackExceptions = rollbackExceptions;
}

@Override
Expand Down Expand Up @@ -51,7 +56,11 @@ public StepOutcome makeStepOutcome(Data data, boolean compensating) {
}
return makeLocalOutcome(Optional.empty());
} catch (RuntimeException e) {
return makeLocalOutcome(Optional.of(e));
localExceptionSavers.stream().filter(saver -> saver.shouldSave(e)).findFirst().ifPresent(saver -> saver.save(data, e));
if (rollbackExceptions.isEmpty() || rollbackExceptions.stream().anyMatch(c -> c.isInstance(e)))
return makeLocalOutcome(Optional.of(e));
else
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

import io.eventuate.tram.sagas.orchestration.SagaDefinition;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class LocalStepBuilder<Data> {
private final SimpleSagaDefinitionBuilder<Data> parent;
private final Consumer<Data> localFunction;
private Optional<Consumer<Data>> compensation = Optional.empty();

private final List<LocalExceptionSaver<Data>> localExceptionSavers = new LinkedList<>();
private final List<Class<RuntimeException>> rollbackExceptions = new LinkedList<>();

public LocalStepBuilder(SimpleSagaDefinitionBuilder<Data> parent, Consumer<Data> localFunction) {
this.parent = parent;
this.localFunction = localFunction;
Expand All @@ -22,14 +28,28 @@ public LocalStepBuilder<Data> withCompensation(Consumer<Data> localCompensation)


public StepBuilder<Data> step() {
parent.addStep(new LocalStep<>(localFunction, compensation));
parent.addStep(makeLocalStep());
return new StepBuilder<>(parent);
}

private LocalStep<Data> makeLocalStep() {
return new LocalStep<>(localFunction, compensation, localExceptionSavers, rollbackExceptions);
}

public SagaDefinition<Data> build() {
// TODO - pull up with template method for completing current step
parent.addStep(new LocalStep<>(localFunction, compensation));
parent.addStep(makeLocalStep());
return parent.build();
}

public <E extends RuntimeException> LocalStepBuilder<Data> onException(Class<E> exceptionType, BiConsumer<Data, E> exceptionConsumer) {
rollbackExceptions.add((Class<RuntimeException>)exceptionType);
localExceptionSavers.add(new LocalExceptionSaver<>(exceptionType, (BiConsumer<Data,RuntimeException>)exceptionConsumer));
return this;
}

public <E extends RuntimeException> LocalStepBuilder<Data> onExceptionRollback(Class<E> exceptionType) {
rollbackExceptions.add((Class<RuntimeException>)exceptionType);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.eventuate.tram.sagas.simpledsl.localexceptions;

public class InvalidOrderException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.eventuate.tram.sagas.simpledsl.localexceptions;

import io.eventuate.tram.sagas.orchestration.SagaDefinition;
import io.eventuate.tram.sagas.simpledsl.SimpleSaga;

public class LocalExceptionCreateOrderSaga implements SimpleSaga<LocalExceptionCreateOrderSagaData> {

private final SagaDefinition<LocalExceptionCreateOrderSagaData> sagaDefinition;

public LocalExceptionCreateOrderSaga(LocalExceptionCreateOrderSagaSteps steps) {
this.sagaDefinition =
step()
.invokeLocal(steps::createOrder)
.withCompensation(steps::rejectOrder)
.step()
.invokeParticipant(LocalExceptionCreateOrderSagaData::reserveCredit)
.withCompensation(LocalExceptionCreateOrderSagaData::releaseCredit)
.step()
.invokeParticipant(LocalExceptionCreateOrderSagaData::reserveInventory)
.withCompensationNotification(LocalExceptionCreateOrderSagaData::releaseInventory)
.step()
.invokeLocal(steps::approveOrder)
.onException(InvalidOrderException.class, LocalExceptionCreateOrderSagaData::saveInvalidOrder)
.onExceptionRollback(InvalidOrderException.class)
.step()
.notifyParticipant(LocalExceptionCreateOrderSagaData::fulfillOrder)
.build();
}


@Override
public SagaDefinition<LocalExceptionCreateOrderSagaData> getSagaDefinition() {
return this.sagaDefinition;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.eventuate.tram.sagas.simpledsl.localexceptions;

import io.eventuate.tram.commands.consumer.CommandWithDestination;
import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand;
import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand;
import io.eventuate.tram.sagas.simpledsl.notifications.FulfillOrder;
import io.eventuate.tram.sagas.simpledsl.notifications.ReleaseInventory;
import io.eventuate.tram.sagas.simpledsl.notifications.ReserveInventory;

public class LocalExceptionCreateOrderSagaData {


private boolean invalidOrder = false;

public CommandWithDestination reserveCredit() {
return new CommandWithDestination("customerService", null, new ReserveCreditCommand());
}

public CommandWithDestination releaseCredit() {
return new CommandWithDestination("customerService", null, new ReleaseCreditCommand());
}

public CommandWithDestination reserveInventory() {
return new CommandWithDestination("inventoryService", null, new ReserveInventory());
}

public CommandWithDestination releaseInventory() {
return new CommandWithDestination("inventoryService", null, new ReleaseInventory());
}

public CommandWithDestination fulfillOrder() {
return new CommandWithDestination("fulfillmentService", null, new FulfillOrder());
}

public void saveInvalidOrder(InvalidOrderException e) {
this.invalidOrder = true;
}

public boolean isInvalidOrder() {
return invalidOrder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.eventuate.tram.sagas.simpledsl.localexceptions;

public interface LocalExceptionCreateOrderSagaSteps {

void createOrder(LocalExceptionCreateOrderSagaData data);
void rejectOrder(LocalExceptionCreateOrderSagaData data);
void approveOrder(LocalExceptionCreateOrderSagaData data);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.eventuate.tram.sagas.simpledsl.localexceptions;

import io.eventuate.tram.sagas.simpledsl.ReleaseCreditCommand;
import io.eventuate.tram.sagas.simpledsl.ReserveCreditCommand;
import io.eventuate.tram.sagas.simpledsl.notifications.FulfillOrder;
import io.eventuate.tram.sagas.simpledsl.notifications.ReleaseInventory;
import io.eventuate.tram.sagas.simpledsl.notifications.ReserveInventory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static io.eventuate.tram.sagas.testing.SagaUnitTestSupport.given;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;

@RunWith(MockitoJUnitRunner.class)
public class LocalExceptionCreateOrderSagaTest {

@Mock
private LocalExceptionCreateOrderSagaSteps steps;

@Test
public void shouldExecuteAllStepsSuccessfully() {
given().
saga(new LocalExceptionCreateOrderSaga(steps), new LocalExceptionCreateOrderSagaData()).
expect().
command(new ReserveCreditCommand()).
to("customerService").
andGiven().
successReply().
expect().
command(new ReserveInventory()).
to("inventoryService").
andGiven().
successReply().
expect().
notification(new FulfillOrder()).
to("fulfillmentService").
expectCompletedSuccessfully()
;
}

@Test
public void shouldRollbackDueToLocalStepFailure() {
doThrow(new InvalidOrderException()).when(steps).approveOrder(any());
given().
saga(new LocalExceptionCreateOrderSaga(steps), new LocalExceptionCreateOrderSagaData()).
expect().
command(new ReserveCreditCommand()).
to("customerService").
andGiven().
successReply().
expect().
command(new ReserveInventory()).
to("inventoryService").
andGiven().
successReply().
expect().
multiple().
notification(new ReleaseInventory()).
to("inventoryService").
command(new ReleaseCreditCommand()).
to("customerService").
verify().
andGiven().
successReply().
expectRolledBack()
.assertSagaData(sagaData -> assertTrue(sagaData.isInvalidOrder()))
;
}

static class UnexpectedException extends RuntimeException {}

@Test(expected = UnexpectedException.class)
public void shouldFailWithUnexpectedException() {
doThrow(new UnexpectedException()).when(steps).approveOrder(any());
given().
saga(new LocalExceptionCreateOrderSaga(steps), new LocalExceptionCreateOrderSagaData()).
expect().
command(new ReserveCreditCommand()).
to("customerService").
andGiven().
successReply().
expect().
command(new ReserveInventory()).
to("inventoryService").
andGiven().
successReply();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ public SagaUnitTestSupport<T> assertSagaData(Consumer<T> sagaDataConsumer) {
return this;
}

public MultipleCommandsExpected multiple() {
return new MultipleCommandsExpected(this);
public MultipleCommandsExpected<T> multiple() {
return new MultipleCommandsExpected<>(this);
}

}

0 comments on commit e47f49c

Please sign in to comment.