Skip to content

Commit

Permalink
chore: make workflow stateless (#108)
Browse files Browse the repository at this point in the history
* chore: make workflow stateless

* removed superfluous try block
  • Loading branch information
octonato authored Dec 18, 2024
1 parent 109c0e0 commit 2b77124
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 60 deletions.
13 changes: 0 additions & 13 deletions akka-javasdk/src/main/java/akka/javasdk/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,6 @@ public void _internalSetup(S state) {
this.currentState = Optional.ofNullable(state);
}

/**
* INTERNAL API
*
* @hidden
*/
@InternalApi
public void _internalClear() {
this.stateHasBeenSet = false;
this.currentState = Optional.empty();
this.commandContext = Optional.empty();
this.timerScheduler = Optional.empty();
}


/**
* @return A workflow definition in a form of steps and transitions between them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import akka.javasdk.workflow.CommandContext
import akka.javasdk.workflow.Workflow
import akka.javasdk.workflow.Workflow.AsyncCallStep
import akka.javasdk.workflow.Workflow.Effect.TransitionalEffect
import akka.javasdk.workflow.WorkflowContext
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.SpiWorkflow

Expand Down Expand Up @@ -56,17 +57,16 @@ object ReflectiveWorkflowRouter {
*/
@InternalApi
class ReflectiveWorkflowRouter[S, W <: Workflow[S]](
val workflow: W,
workflowContext: WorkflowContext,
instanceFactory: Function[WorkflowContext, W],
commandHandlers: Map[String, CommandHandler],
serializer: JsonSerializer) {

private def decodeUserState(userState: Option[BytesPayload]): S =
private def decodeUserState(userState: Option[BytesPayload]): Option[S] =
userState
.collect {
case payload if payload.nonEmpty => serializer.fromBytes(payload).asInstanceOf[S]
}
// if runtime doesn't have a state to provide, we fall back to user's own defined empty state
.getOrElse(workflow.emptyState())

// in same cases, the runtime may send a message with contentType set to object.
// if that's the case, we need to patch the message using the contentType from the expected input class
Expand All @@ -83,7 +83,11 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]](
private def commandHandlerLookup(commandName: String) =
commandHandlers.getOrElse(
commandName,
throw new HandlerNotFoundException("command", commandName, workflow.getClass, commandHandlers.keySet))
throw new HandlerNotFoundException(
"command",
commandName,
instanceFactory(workflowContext).getClass,
commandHandlers.keySet))

final def handleCommand(
userState: Option[SpiWorkflow.State],
Expand All @@ -92,9 +96,13 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]](
context: CommandContext,
timerScheduler: TimerScheduler): CommandResult = {

val workflow = instanceFactory(workflowContext)

val commandEffect =
try {
workflow._internalSetup(decodeUserState(userState), context, timerScheduler)
// if runtime doesn't have a state to provide, we fall back to user's own defined empty state
val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState())
workflow._internalSetup(decodedState, context, timerScheduler)

val commandHandler = commandHandlerLookup(commandName)

Expand Down Expand Up @@ -122,8 +130,6 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]](
context.workflowId(),
commandName,
s"No command handler found for command [$name] on [${workflow.getClass.getName}]")
} finally {
workflow._internalClear();
}

CommandResult(commandEffect)
Expand All @@ -139,49 +145,48 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]](

implicit val ec: ExecutionContext = executionContext

try {
workflow._internalSetup(decodeUserState(userState), commandContext, timerScheduler)
workflow.definition().findByName(stepName).toScala match {
case Some(call: AsyncCallStep[_, _, _]) =>
val decodedInput = input match {
case Some(inputValue) => decodeInput(inputValue, call.callInputClass)
case None => null // to meet a signature of supplier expressed as a function
}
val workflow = instanceFactory(workflowContext)
// if runtime doesn't have a state to provide, we fall back to user's own defined empty state
val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState())
workflow._internalSetup(decodedState, commandContext, timerScheduler)

val future = call.callFunc
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
.apply(decodedInput)
.asScala
workflow.definition().findByName(stepName).toScala match {
case Some(call: AsyncCallStep[_, _, _]) =>
val decodedInput = input match {
case Some(inputValue) => decodeInput(inputValue, call.callInputClass)
case None => null // to meet a signature of supplier expressed as a function
}

future.map(serializer.toBytes)
val future = call.callFunc
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
.apply(decodedInput)
.asScala

case Some(any) => Future.failed(WorkflowStepNotSupported(any.getClass.getSimpleName))
case None => Future.failed(WorkflowStepNotFound(stepName))
}
} finally {
workflow._internalClear()
}
future.map(serializer.toBytes)

case Some(any) => Future.failed(WorkflowStepNotSupported(any.getClass.getSimpleName))
case None => Future.failed(WorkflowStepNotFound(stepName))
}
}

final def getNextStep(stepName: String, result: BytesPayload, userState: Option[BytesPayload]): TransitionalResult = {

try {
workflow._internalSetup(decodeUserState(userState))
workflow.definition().findByName(stepName).toScala match {
case Some(call: AsyncCallStep[_, _, _]) =>
val effect =
call.transitionFunc
.asInstanceOf[JFunc[Any, TransitionalEffect[Any]]]
.apply(decodeInput(result, call.transitionInputClass))
val workflow = instanceFactory(workflowContext)

TransitionalResult(effect)
// if runtime doesn't have a state to provide, we fall back to user's own defined empty state
val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState())
workflow._internalSetup(decodedState)
workflow.definition().findByName(stepName).toScala match {
case Some(call: AsyncCallStep[_, _, _]) =>
val effect =
call.transitionFunc
.asInstanceOf[JFunc[Any, TransitionalEffect[Any]]]
.apply(decodeInput(result, call.transitionInputClass))

case Some(any) => throw WorkflowStepNotSupported(any.getClass.getSimpleName)
case None => throw WorkflowStepNotFound(stepName)
}
} finally {
workflow._internalClear();
TransitionalResult(effect)

case Some(any) => throw WorkflowStepNotSupported(any.getClass.getSimpleName)
case None => throw WorkflowStepNotFound(stepName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class WorkflowImpl[S, W <: Workflow[S]](
private val context = new WorkflowContextImpl(workflowId)

private val router =
new ReflectiveWorkflowRouter[S, W](instanceFactory(context), componentDescriptor.commandHandlers, serializer)
new ReflectiveWorkflowRouter[S, W](context, instanceFactory, componentDescriptor.commandHandlers, serializer)

override def configuration: SpiWorkflow.WorkflowConfig = {
val definition = router.workflow.definition()
val workflow = instanceFactory(context)
val definition = workflow.definition()

def toRecovery(sdkRecoverStrategy: SdkRecoverStrategy[_]): SpiWorkflow.RecoverStrategy = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ public WorkflowDef<TransferState> definition() {
.asyncCall(Withdraw.class, cmd -> {
logger.info("Running withdraw: {}", cmd);

// saving the wallet id in var because it's being used in thenCompose
var fromWalletId = currentState().transfer().from();

// cancelling the timer in case it was scheduled
return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ ->
componentClient.forEventSourcedEntity(fromWalletId)
componentClient.forEventSourcedEntity(currentState().transfer().from())
.method(WalletEntity::withdraw)
.invokeAsync(cmd));
})
Expand Down

0 comments on commit 2b77124

Please sign in to comment.