diff --git a/akka-javasdk/src/main/java/akka/javasdk/workflow/Workflow.java b/akka-javasdk/src/main/java/akka/javasdk/workflow/Workflow.java index 769d5a8cb..06be3b61e 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/workflow/Workflow.java +++ b/akka-javasdk/src/main/java/akka/javasdk/workflow/Workflow.java @@ -145,19 +145,6 @@ public void _internalSetup(S state) { this.currentState = Optional.ofNullable(state); } - /** - * INTERNAL API - * - * @hidden - */ - @InternalApi - public void _internalClear() { - this.stateHasBeenSet = false; - this.currentState = Optional.empty(); - this.commandContext = Optional.empty(); - this.timerScheduler = Optional.empty(); - } - /** * @return A workflow definition in a form of steps and transitions between them. diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala index 98be5b6fb..c3dbe7781 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala @@ -28,6 +28,7 @@ import akka.javasdk.workflow.CommandContext import akka.javasdk.workflow.Workflow import akka.javasdk.workflow.Workflow.AsyncCallStep import akka.javasdk.workflow.Workflow.Effect.TransitionalEffect +import akka.javasdk.workflow.WorkflowContext import akka.runtime.sdk.spi.BytesPayload import akka.runtime.sdk.spi.SpiWorkflow @@ -56,17 +57,16 @@ object ReflectiveWorkflowRouter { */ @InternalApi class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( - val workflow: W, + workflowContext: WorkflowContext, + instanceFactory: Function[WorkflowContext, W], commandHandlers: Map[String, CommandHandler], serializer: JsonSerializer) { - private def decodeUserState(userState: Option[BytesPayload]): S = + private def decodeUserState(userState: Option[BytesPayload]): Option[S] = userState .collect { case payload if payload.nonEmpty => serializer.fromBytes(payload).asInstanceOf[S] } - // if runtime doesn't have a state to provide, we fall back to user's own defined empty state - .getOrElse(workflow.emptyState()) // in same cases, the runtime may send a message with contentType set to object. // if that's the case, we need to patch the message using the contentType from the expected input class @@ -83,7 +83,11 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( private def commandHandlerLookup(commandName: String) = commandHandlers.getOrElse( commandName, - throw new HandlerNotFoundException("command", commandName, workflow.getClass, commandHandlers.keySet)) + throw new HandlerNotFoundException( + "command", + commandName, + instanceFactory(workflowContext).getClass, + commandHandlers.keySet)) final def handleCommand( userState: Option[SpiWorkflow.State], @@ -92,9 +96,13 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( context: CommandContext, timerScheduler: TimerScheduler): CommandResult = { + val workflow = instanceFactory(workflowContext) + val commandEffect = try { - workflow._internalSetup(decodeUserState(userState), context, timerScheduler) + // if runtime doesn't have a state to provide, we fall back to user's own defined empty state + val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState()) + workflow._internalSetup(decodedState, context, timerScheduler) val commandHandler = commandHandlerLookup(commandName) @@ -122,8 +130,6 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( context.workflowId(), commandName, s"No command handler found for command [$name] on [${workflow.getClass.getName}]") - } finally { - workflow._internalClear(); } CommandResult(commandEffect) @@ -139,49 +145,48 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]]( implicit val ec: ExecutionContext = executionContext - try { - workflow._internalSetup(decodeUserState(userState), commandContext, timerScheduler) - workflow.definition().findByName(stepName).toScala match { - case Some(call: AsyncCallStep[_, _, _]) => - val decodedInput = input match { - case Some(inputValue) => decodeInput(inputValue, call.callInputClass) - case None => null // to meet a signature of supplier expressed as a function - } + val workflow = instanceFactory(workflowContext) + // if runtime doesn't have a state to provide, we fall back to user's own defined empty state + val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState()) + workflow._internalSetup(decodedState, commandContext, timerScheduler) - val future = call.callFunc - .asInstanceOf[JFunc[Any, CompletionStage[Any]]] - .apply(decodedInput) - .asScala + workflow.definition().findByName(stepName).toScala match { + case Some(call: AsyncCallStep[_, _, _]) => + val decodedInput = input match { + case Some(inputValue) => decodeInput(inputValue, call.callInputClass) + case None => null // to meet a signature of supplier expressed as a function + } - future.map(serializer.toBytes) + val future = call.callFunc + .asInstanceOf[JFunc[Any, CompletionStage[Any]]] + .apply(decodedInput) + .asScala - case Some(any) => Future.failed(WorkflowStepNotSupported(any.getClass.getSimpleName)) - case None => Future.failed(WorkflowStepNotFound(stepName)) - } - } finally { - workflow._internalClear() - } + future.map(serializer.toBytes) + case Some(any) => Future.failed(WorkflowStepNotSupported(any.getClass.getSimpleName)) + case None => Future.failed(WorkflowStepNotFound(stepName)) + } } final def getNextStep(stepName: String, result: BytesPayload, userState: Option[BytesPayload]): TransitionalResult = { - try { - workflow._internalSetup(decodeUserState(userState)) - workflow.definition().findByName(stepName).toScala match { - case Some(call: AsyncCallStep[_, _, _]) => - val effect = - call.transitionFunc - .asInstanceOf[JFunc[Any, TransitionalEffect[Any]]] - .apply(decodeInput(result, call.transitionInputClass)) + val workflow = instanceFactory(workflowContext) - TransitionalResult(effect) + // if runtime doesn't have a state to provide, we fall back to user's own defined empty state + val decodedState = decodeUserState(userState).getOrElse(workflow.emptyState()) + workflow._internalSetup(decodedState) + workflow.definition().findByName(stepName).toScala match { + case Some(call: AsyncCallStep[_, _, _]) => + val effect = + call.transitionFunc + .asInstanceOf[JFunc[Any, TransitionalEffect[Any]]] + .apply(decodeInput(result, call.transitionInputClass)) - case Some(any) => throw WorkflowStepNotSupported(any.getClass.getSimpleName) - case None => throw WorkflowStepNotFound(stepName) - } - } finally { - workflow._internalClear(); + TransitionalResult(effect) + + case Some(any) => throw WorkflowStepNotSupported(any.getClass.getSimpleName) + case None => throw WorkflowStepNotFound(stepName) } } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala index dd3e1c03f..d9a8de788 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala @@ -68,10 +68,11 @@ class WorkflowImpl[S, W <: Workflow[S]]( private val context = new WorkflowContextImpl(workflowId) private val router = - new ReflectiveWorkflowRouter[S, W](instanceFactory(context), componentDescriptor.commandHandlers, serializer) + new ReflectiveWorkflowRouter[S, W](context, instanceFactory, componentDescriptor.commandHandlers, serializer) override def configuration: SpiWorkflow.WorkflowConfig = { - val definition = router.workflow.definition() + val workflow = instanceFactory(context) + val definition = workflow.definition() def toRecovery(sdkRecoverStrategy: SdkRecoverStrategy[_]): SpiWorkflow.RecoverStrategy = { diff --git a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java index 5739a7a7e..e442bafa7 100644 --- a/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java +++ b/samples/transfer-workflow-compensation/src/main/java/com/example/transfer/application/TransferWorkflow.java @@ -47,12 +47,9 @@ public WorkflowDef definition() { .asyncCall(Withdraw.class, cmd -> { logger.info("Running withdraw: {}", cmd); - // saving the wallet id in var because it's being used in thenCompose - var fromWalletId = currentState().transfer().from(); - // cancelling the timer in case it was scheduled return timers().cancel("acceptationTimout-" + currentState().transferId()).thenCompose(__ -> - componentClient.forEventSourcedEntity(fromWalletId) + componentClient.forEventSourcedEntity(currentState().transfer().from()) .method(WalletEntity::withdraw) .invokeAsync(cmd)); })