diff --git a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstance.scala b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstance.scala index 75aa6055..55e18b33 100644 --- a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstance.scala +++ b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstance.scala @@ -28,7 +28,7 @@ object PetriNetInstance { def petriNetInstancePersistenceId(processId: String): String = s"process-$processId" - def instanceState[S](instance: Instance[S]): InstanceState[S] = { + def instanceState[S, T <: Transition[_, _, S]](instance: Instance[S, T]): InstanceState[S] = { val failures = instance.failedJobs.map { e => e.transitionId -> PetriNetInstanceProtocol.ExceptionState( e.consecutiveFailureCount, @@ -40,20 +40,23 @@ object PetriNetInstance { InstanceState[S](instance.sequenceNr, instance.marking, instance.state, failures) } - def props[S](topology: ExecutablePetriNet[S], settings: Settings = defaultSettings): Props = - Props(new PetriNetInstance[S](topology, settings, new TransitionExecutorImpl[IO, S](topology))) + def props[S, T <: Transition[Any, _, S]](topology: ExecutablePetriNet[S, T], settings: Settings = defaultSettings)( + implicit executorFactory: TransitionExecutorFactory.WithInputOutputState[IO, T, Any, _, S] + ): Props = + Props(new PetriNetInstance[S, T](topology, settings, new TransitionExecutorImpl[IO, T](topology))) } /** * This actor is responsible for maintaining the state of a single petri net instance. */ -class PetriNetInstance[S]( - override val topology: ExecutablePetriNet[S], +class PetriNetInstance[S, T <: Transition[Any, _, S]]( + override val topology: ExecutablePetriNet[S, T], val settings: Settings, - executor: TransitionExecutor[IO, S] -) extends PersistentActor + executor: TransitionExecutor[IO, T] +)(implicit executorFactory: TransitionExecutorFactory.WithInputOutputState[IO, T, Any, _, S]) + extends PersistentActor with ActorLogging - with PetriNetInstanceRecovery[S] { + with PetriNetInstanceRecovery[S, T] { import PetriNetInstance._ @@ -68,7 +71,7 @@ class PetriNetInstance[S]( def uninitialized: Receive = { case msg @ Initialize(marking, state) => log.debug(s"Received message: {}", msg) - val uninitialized = Instance.uninitialized[S](topology) + val uninitialized = Instance.uninitialized[S, T](topology) persistEvent(uninitialized, InitializedEvent(marking, state.asInstanceOf[S])) { (applyEvent(uninitialized) _) .andThen(step) @@ -79,7 +82,7 @@ class PetriNetInstance[S]( context.stop(context.self) } - def running(instance: Instance[S]): Receive = { + def running(instance: Instance[S, T]): Receive = { case IdleStop(n) if n == instance.sequenceNr && instance.activeJobs.isEmpty => context.stop(context.self) @@ -105,7 +108,7 @@ class PetriNetInstance[S]( val updatedInstance = applyEvent(instance)(e) - def updateAndRespond(instance: Instance[S]) = { + def updateAndRespond(instance: Instance[S, T]) = { sender() ! TransitionFailed(transitionId, consume, input, reason, strategy) context become running(instance) } @@ -116,7 +119,9 @@ class PetriNetInstance[S]( s"Scheduling a retry of transition '${topology.transitions.getById(transitionId)}' in $delay milliseconds" ) val originalSender = sender() - system.scheduler.scheduleOnce(delay milliseconds) { executeJob(updatedInstance.jobs(jobId), originalSender) } + system.scheduler.scheduleOnce(delay milliseconds) { + executeJob(updatedInstance.jobs(jobId), originalSender) + } updateAndRespond(applyEvent(instance)(e)) case _ => persistEvent(instance, e)((applyEvent(instance) _).andThen(updateAndRespond _)) @@ -125,7 +130,7 @@ class PetriNetInstance[S]( case msg @ FireTransition(id, input, correlationId) => log.debug(s"Received message: {}", msg) - fireTransitionById[S](id, input).run(instance).value match { + fireTransitionById[S, T](id, input).run(instance).value match { case (updatedInstance, Right(job)) => executeJob(job, sender()) context become running(updatedInstance) @@ -138,7 +143,7 @@ class PetriNetInstance[S]( } // TODO remove side effecting here - def step(instance: Instance[S]): Instance[S] = { + def step(instance: Instance[S, T]): Instance[S, T] = { fireAllEnabledTransitions.run(instance).value match { case (updatedInstance, jobs) => if (jobs.isEmpty && updatedInstance.activeJobs.isEmpty) @@ -153,8 +158,10 @@ class PetriNetInstance[S]( } } - def executeJob[E](job: Job[S, E], originalSender: ActorRef) = - runJobAsync(job, executor)(settings.evaluationStrategy).unsafeToFuture().pipeTo(context.self)(originalSender) + def executeJob(job: Job[S, T], originalSender: ActorRef) = + runJobAsync[S, T](job, executor)(settings.evaluationStrategy, executorFactory) + .unsafeToFuture() + .pipeTo(context.self)(originalSender) - override def onRecoveryCompleted(instance: Instance[S]) = step(instance) + override def onRecoveryCompleted(instance: Instance[S, T]) = step(instance) } diff --git a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala index f446d312..2fd38f32 100644 --- a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala +++ b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala @@ -42,19 +42,26 @@ class QueuePushingActor[E](queue: SourceQueueWithComplete[E], takeWhile: Any => object PetriNetInstanceApi { - def hasAutomaticTransitions[S](topology: ExecutablePetriNet[S]): InstanceState[S] => Boolean = state => { + def hasAutomaticTransitions[S, T <: Transition[_, _, S]]( + topology: ExecutablePetriNet[S, T] + ): InstanceState[S] => Boolean = state => { state.marking.keySet .map(p => topology.outgoingTransitions(p)) - .foldLeft(Set.empty[Transition[_, _, _]]) { case (result, transitions) => + .foldLeft(Set.empty[T]) { case (result, transitions) => result ++ transitions } .exists(isEnabledInState(topology, state)) } - def isEnabledInState[S](topology: ExecutablePetriNet[S], state: InstanceState[S])(t: Transition[_, _, _]): Boolean = + def isEnabledInState[S, T <: Transition[_, _, S]](topology: ExecutablePetriNet[S, T], state: InstanceState[S])( + t: T + ): Boolean = t.isAutomated && !state.hasFailed(t.id) && topology.isEnabledInMarking(state.marking.multiplicities)(t) - def takeWhileNotFailed[S](topology: ExecutablePetriNet[S], waitForRetries: Boolean): Any => Boolean = e => + def takeWhileNotFailed[S, T <: Transition[_, _, S]]( + topology: ExecutablePetriNet[S, T], + waitForRetries: Boolean + ): Any => Boolean = e => e match { case e: TransitionFired[S] => hasAutomaticTransitions(topology)(e.result) case TransitionFailed(_, _, _, _, RetryWithDelay(delay)) => waitForRetries @@ -75,7 +82,7 @@ object PetriNetInstanceApi { /** * Contains some methods to interact with a petri net instance actor. */ -class PetriNetInstanceApi[S](topology: ExecutablePetriNet[S], actor: ActorRef)(implicit +class PetriNetInstanceApi[S, T <: Transition[_, _, S]](topology: ExecutablePetriNet[S, T], actor: ActorRef)(implicit actorSystem: ActorSystem, materializer: Materializer ) { diff --git a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceRecovery.scala b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceRecovery.scala index 6fd79bc9..0e51b3a0 100644 --- a/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceRecovery.scala +++ b/akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceRecovery.scala @@ -1,30 +1,30 @@ package io.kagera.akka.actor import akka.persistence.{ PersistentActor, RecoveryCompleted } -import io.kagera.api.colored.ExecutablePetriNet +import io.kagera.api.colored.{ ExecutablePetriNet, Transition } import io.kagera.execution.EventSourcing._ import io.kagera.execution.{ EventSourcing, Instance } import io.kagera.persistence.{ messages, Serialization } -trait PetriNetInstanceRecovery[S] { +trait PetriNetInstanceRecovery[S, T <: Transition[_, _, S]] { this: PersistentActor => - def topology: ExecutablePetriNet[S] + def topology: ExecutablePetriNet[S, T] implicit val system = context.system val serializer = new Serialization(new AkkaObjectSerializer(context.system)) - def onRecoveryCompleted(state: Instance[S]) + def onRecoveryCompleted(state: Instance[S, T]): Unit - def applyEvent(i: Instance[S])(e: Event): Instance[S] = EventSourcing.applyEvent(e).runS(i).value + def applyEvent(i: Instance[S, T])(e: Event): Instance[S, T] = EventSourcing.applyEvent[S, T](e).runS(i).value - def persistEvent[T, E <: Event](instance: Instance[S], e: E)(fn: E => T): Unit = { + def persistEvent[R, E <: Event](instance: Instance[S, T], e: E)(fn: E => R): Unit = { val serializedEvent = serializer.serializeEvent(e)(instance) persist(serializedEvent) { persisted => fn.apply(e) } } - private var recoveringState: Instance[S] = Instance.uninitialized[S](topology) + private var recoveringState: Instance[S, T] = Instance.uninitialized[S, T](topology) private def applyToRecoveringState(e: AnyRef) = { val deserializedEvent = serializer.deserializeEvent(e)(recoveringState) diff --git a/akka/src/main/scala/io/kagera/akka/query/PetriNetQuery.scala b/akka/src/main/scala/io/kagera/akka/query/PetriNetQuery.scala index 8c60b93d..c0428849 100644 --- a/akka/src/main/scala/io/kagera/akka/query/PetriNetQuery.scala +++ b/akka/src/main/scala/io/kagera/akka/query/PetriNetQuery.scala @@ -5,18 +5,18 @@ import akka.actor.ActorSystem import akka.persistence.query.scaladsl._ import akka.stream.scaladsl._ import io.kagera.akka.actor.{ AkkaObjectSerializer, PetriNetInstance } -import io.kagera.api.colored.ExecutablePetriNet +import io.kagera.api.colored.{ ExecutablePetriNet, Transition } import io.kagera.execution.EventSourcing._ import io.kagera.execution.Instance import io.kagera.persistence.Serialization -trait PetriNetQuery[S] { +trait PetriNetQuery[S, T <: Transition[_, _, S]] { def readJournal: ReadJournal with CurrentEventsByPersistenceIdQuery - def eventsForInstance(instanceId: String, topology: ExecutablePetriNet[S])(implicit + def eventsForInstance(instanceId: String, topology: ExecutablePetriNet[S, T])(implicit actorSystem: ActorSystem - ): (Source[(Instance[S], Event), NotUsed]) = { + ): (Source[(Instance[S, T], Event), NotUsed]) = { val serializer = new Serialization(new AkkaObjectSerializer(actorSystem)) @@ -24,11 +24,11 @@ trait PetriNetQuery[S] { val src = readJournal.currentEventsByPersistenceId(persistentId, 0, Long.MaxValue) src - .scan[(Instance[S], Event)]((Instance.uninitialized(topology), null.asInstanceOf[Event])) { + .scan[(Instance[S, T], Event)]((Instance.uninitialized(topology), null.asInstanceOf[Event])) { case ((instance, prev), e) => val event = e.event.asInstanceOf[AnyRef] val deserializedEvent = serializer.deserializeEvent(event)(instance) - val updatedInstance = applyEvent(deserializedEvent).runS(instance).value + val updatedInstance = applyEvent[S, T](deserializedEvent).runS(instance).value (updatedInstance, deserializedEvent) } .drop(1) diff --git a/akka/src/main/scala/io/kagera/persistence/Serialization.scala b/akka/src/main/scala/io/kagera/persistence/Serialization.scala index 879d1068..211dce59 100644 --- a/akka/src/main/scala/io/kagera/persistence/Serialization.scala +++ b/akka/src/main/scala/io/kagera/persistence/Serialization.scala @@ -43,16 +43,17 @@ class Serialization(serializer: ObjectSerializer) { * De-serializes a persistence.messages.Event to a EvenSourcing.Event. An Instance is required to 'wire' or * 'reference' the message back into context. */ - def deserializeEvent[S](event: AnyRef): Instance[S] => EventSourcing.Event = event match { - case e: messages.Initialized => deserializeInitialized(e) - case e: messages.TransitionFired => deserializeTransitionFired(e) - case e: messages.TransitionFailed => deserializeTransitionFailed(e) - } + def deserializeEvent[S, T <: Transition[_, _, S]](event: AnyRef): Instance[S, T] => EventSourcing.Event = + event match { + case e: messages.Initialized => deserializeInitialized(e) + case e: messages.TransitionFired => deserializeTransitionFired(e) + case e: messages.TransitionFailed => deserializeTransitionFailed(e) + } /** * Serializes an EventSourcing.Event to a persistence.messages.Event. */ - def serializeEvent[S](e: EventSourcing.Event): Instance[S] => AnyRef = + def serializeEvent[S, T <: Transition[_, _, S]](e: EventSourcing.Event): Instance[S, T] => AnyRef = instance => e match { case e: InitializedEvent => serializeInitialized(e) @@ -70,7 +71,10 @@ class Serialization(serializer: ObjectSerializer) { } } - private def deserializeProducedMarking[S](instance: Instance[S], produced: Seq[messages.ProducedToken]): Marking = { + private def deserializeProducedMarking[S, T <: Transition[_, _, S]]( + instance: Instance[S, T], + produced: Seq[messages.ProducedToken] + ): Marking = { produced.foldLeft(Marking.empty) { case (accumulated, messages.ProducedToken(Some(placeId), Some(tokenId), Some(count), data, _)) => val place = instance.process.places.getById(placeId) @@ -104,7 +108,10 @@ class Serialization(serializer: ObjectSerializer) { } } - private def deserializeConsumedMarking[S](instance: Instance[S], e: messages.TransitionFired): Marking = { + private def deserializeConsumedMarking[S, T <: Transition[_, _, S]]( + instance: Instance[S, T], + e: messages.TransitionFired + ): Marking = { e.consumed.foldLeft(Marking.empty) { case (accumulated, messages.ConsumedToken(Some(placeId), Some(tokenId), Some(count), _)) => val place = instance.marking.keySet.getById(placeId) @@ -114,7 +121,9 @@ class Serialization(serializer: ObjectSerializer) { } } - private def deserializeInitialized[S](e: messages.Initialized)(instance: Instance[S]): InitializedEvent = { + private def deserializeInitialized[S, T <: Transition[_, _, S]]( + e: messages.Initialized + )(instance: Instance[S, T]): InitializedEvent = { val initialMarking = deserializeProducedMarking(instance, e.initialMarking) val initialState = e.initialState.map(serializer.deserializeObject).getOrElse(BoxedUnit.UNIT) InitializedEvent(initialMarking, initialState) @@ -126,31 +135,32 @@ class Serialization(serializer: ObjectSerializer) { messages.Initialized(initialMarking, initialState) } - private def deserializeTransitionFailed[S](e: messages.TransitionFailed): Instance[S] => TransitionFailedEvent = { - instance => - val jobId = e.jobId.getOrElse(missingFieldException("job_id")) - val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id")) - val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started")) - val timeFailed = e.timeFailed.getOrElse(missingFieldException("time_failed")) - val input = e.inputData.map(serializer.deserializeObject) - val failureReason = e.failureReason.getOrElse("") - val failureStrategy = e.failureStrategy.getOrElse(missingFieldException("time_failed")) match { - case FailureStrategy(Some(StrategyType.BLOCK_TRANSITION), _, _) => BlockTransition - case FailureStrategy(Some(StrategyType.BLOCK_ALL), _, _) => Fatal - case FailureStrategy(Some(StrategyType.RETRY), Some(delay), _) => RetryWithDelay(delay) - case other @ _ => throw new IllegalStateException(s"Invalid failure strategy: $other") - } + private def deserializeTransitionFailed[S, T <: Transition[_, _, S]]( + e: messages.TransitionFailed + ): Instance[S, T] => TransitionFailedEvent = { instance => + val jobId = e.jobId.getOrElse(missingFieldException("job_id")) + val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id")) + val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started")) + val timeFailed = e.timeFailed.getOrElse(missingFieldException("time_failed")) + val input = e.inputData.map(serializer.deserializeObject) + val failureReason = e.failureReason.getOrElse("") + val failureStrategy = e.failureStrategy.getOrElse(missingFieldException("time_failed")) match { + case FailureStrategy(Some(StrategyType.BLOCK_TRANSITION), _, _) => BlockTransition + case FailureStrategy(Some(StrategyType.BLOCK_ALL), _, _) => Fatal + case FailureStrategy(Some(StrategyType.RETRY), Some(delay), _) => RetryWithDelay(delay) + case other @ _ => throw new IllegalStateException(s"Invalid failure strategy: $other") + } - TransitionFailedEvent( - jobId, - transitionId, - timeStarted, - timeFailed, - Marking.empty, - None, - failureReason, - failureStrategy - ) + TransitionFailedEvent( + jobId, + transitionId, + timeStarted, + timeFailed, + Marking.empty, + None, + failureReason, + failureStrategy + ) } private def serializeTransitionFailed(e: TransitionFailedEvent): messages.TransitionFailed = { @@ -188,19 +198,20 @@ class Serialization(serializer: ObjectSerializer) { ) } - private def deserializeTransitionFired[S](e: messages.TransitionFired): Instance[S] => TransitionFiredEvent = - instance => { + private def deserializeTransitionFired[S, T <: Transition[_, _, S]]( + e: messages.TransitionFired + ): Instance[S, T] => TransitionFiredEvent = instance => { - val consumed: Marking = deserializeConsumedMarking(instance, e) - val produced: Marking = deserializeProducedMarking(instance, e.produced) + val consumed: Marking = deserializeConsumedMarking(instance, e) + val produced: Marking = deserializeProducedMarking(instance, e.produced) - val data = e.data.map(serializer.deserializeObject) + val data = e.data.map(serializer.deserializeObject) - val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id")) - val jobId = e.jobId.getOrElse(missingFieldException("job_id")) - val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started")) - val timeCompleted = e.timeCompleted.getOrElse(missingFieldException("time_completed")) + val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id")) + val jobId = e.jobId.getOrElse(missingFieldException("job_id")) + val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started")) + val timeCompleted = e.timeCompleted.getOrElse(missingFieldException("time_completed")) - TransitionFiredEvent(jobId, transitionId, timeStarted, timeCompleted, consumed, produced, data) - } + TransitionFiredEvent(jobId, transitionId, timeStarted, timeCompleted, consumed, produced, data) + } } diff --git a/akka/src/test/scala/io/kagera/akka/PetriNetInstanceApiSpec.scala b/akka/src/test/scala/io/kagera/akka/PetriNetInstanceApiSpec.scala index 1afe0c05..bf8e2c5e 100644 --- a/akka/src/test/scala/io/kagera/akka/PetriNetInstanceApiSpec.scala +++ b/akka/src/test/scala/io/kagera/akka/PetriNetInstanceApiSpec.scala @@ -5,7 +5,8 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl.{ Sink, Source } import io.kagera.akka.actor.PetriNetInstanceApi import io.kagera.akka.actor.PetriNetInstanceProtocol._ -import org.scalatest.matchers.should.Matchers._ +import io.kagera.api.colored.dsl.SequenceNetTransition +import org.scalatest.Matchers._ import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext } @@ -27,12 +28,12 @@ class PetriNetInstanceApiSpec extends AkkaTestBase { transition(automated = true)(_ => Added(3)) ) - val actor = PetriNetInstanceSpec.createPetriNetActor[Set[Int]](petriNet) + val actor = PetriNetInstanceSpec.createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) - val api = new PetriNetInstanceApi(petriNet, actor) + val api = new PetriNetInstanceApi[Set[Int], SequenceNetTransition[Set[Int], Event]](petriNet, actor) val source: Source[TransitionResponse, NotUsed] = api.askAndCollectAll(FireTransition(1, ())) val responses = Await.result(source.runWith(Sink.seq[TransitionResponse]), waitTimeout) @@ -49,8 +50,8 @@ class PetriNetInstanceApiSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => Added(1))) - val actor = PetriNetInstanceSpec.createPetriNetActor[Set[Int]](petriNet) - val api = new PetriNetInstanceApi(petriNet, actor) + val actor = PetriNetInstanceSpec.createPetriNetActor(petriNet) + val api = new PetriNetInstanceApi[Set[Int], SequenceNetTransition[Set[Int], Event]](petriNet, actor) val source: Source[TransitionResponse, NotUsed] = api.askAndCollectAll(FireTransition(1, ())) val responses = Await.result(source.runWith(Sink.seq[TransitionResponse]), waitTimeout) diff --git a/akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala b/akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala index 11d79abc..c131e1ce 100644 --- a/akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala +++ b/akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala @@ -2,6 +2,7 @@ package io.kagera.akka import java.util.UUID import akka.actor.{ ActorSystem, PoisonPill, Terminated } +import cats.effect.IO import io.kagera.akka.PetriNetInstanceSpec._ import io.kagera.akka.actor.PetriNetInstance import io.kagera.akka.actor.PetriNetInstance.Settings @@ -16,11 +17,19 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object PetriNetInstanceSpec { - - def createPetriNetActor[S](petriNet: ExecutablePetriNet[S], processId: String = UUID.randomUUID().toString)(implicit - system: ActorSystem - ) = - system.actorOf(PetriNetInstance.props(petriNet), processId) + implicit val sequenceNetTransitionExecutorFactoryImp + : TransitionExecutorFactory.WithInputOutputState[IO, SequenceNetTransition[Set[Int], Event], Any, _, Set[Int]] = + SequenceNetTransitionExecutorFactory + .sequenceNetTransitionExecutorFactory[IO, Set[Int], Event] + .asInstanceOf[TransitionExecutorFactory.WithInputOutputState[IO, SequenceNetTransition[ + Set[Int], + Event + ], Any, _, Set[Int]]] + def createPetriNetActor( + petriNet: ExecutablePetriNet[Set[Int], SequenceNetTransition[Set[Int], Event]], + processId: String = UUID.randomUUID().toString + )(implicit system: ActorSystem) = + system.actorOf(PetriNetInstance.props[Set[Int], SequenceNetTransition[Set[Int], Event]](petriNet), processId) } class PetriNetInstanceSpec extends AkkaTestBase { @@ -31,7 +40,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => Added(1)), transition()(_ => Added(2))) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set(1, 2, 3)) expectMsg(Initialized(initialMarking, Set(1, 2, 3))) @@ -41,7 +50,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => Added(1)), transition()(_ => Added(2))) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! GetState expectMsgClass(classOf[IllegalCommand]) @@ -51,7 +60,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => Added(1)), transition()(_ => Added(2))) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set(1, 2, 3)) expectMsgClass(classOf[Initialized[_]]) @@ -68,7 +77,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { transition()(_ => throw new RuntimeException("t2 failed!")) ) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -83,7 +92,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => throw new RuntimeException("t1 failed!")), transition()(_ => Added(2))) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -102,7 +111,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { override val sequence = Seq(transition()(_ => Added(1)), transition()(_ => Added(2))) - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -128,7 +137,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { val id = UUID.randomUUID() - val actor = createPetriNetActor[Set[Int]](petriNet) + val actor = createPetriNetActor(petriNet) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -153,7 +162,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { val actorName = java.util.UUID.randomUUID().toString - val actor = createPetriNetActor[Set[Int]](petriNet, actorName) + val actor = createPetriNetActor(petriNet, actorName) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -178,7 +187,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { Thread.sleep(100) // create a new actor with the same persistent identifier - val newActor = createPetriNetActor[Set[Int]](petriNet, actorName) + val newActor = createPetriNetActor(petriNet, actorName) newActor ! GetState @@ -195,7 +204,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { val actorName = java.util.UUID.randomUUID().toString - val actor = createPetriNetActor[Set[Int]](petriNet, actorName) + val actor = createPetriNetActor(petriNet, actorName) actor ! Initialize(initialMarking, Set.empty) expectMsgClass(classOf[Initialized[_]]) @@ -210,7 +219,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { expectMsgClass(classOf[Terminated]) // create a new actor with the same persistent identifier - val newActor = createPetriNetActor[Set[Int]](petriNet, actorName) + val newActor = createPetriNetActor(petriNet, actorName) // TODO assert t2 is not fired again using mocks @@ -232,7 +241,7 @@ class PetriNetInstanceSpec extends AkkaTestBase { Seq(transition(automated = false)(_ => Added(1)), transition(automated = false)(_ => Added(2))) val actor = system.actorOf( - props = PetriNetInstance.props(petriNet, customSettings), + props = PetriNetInstance.props[Set[Int], SequenceNetTransition[Set[Int], Event]](petriNet, customSettings), name = java.util.UUID.randomUUID().toString ) diff --git a/api/src/main/scala/io/kagera/api/colored/ColoredTokenGame.scala b/api/src/main/scala/io/kagera/api/colored/ColoredTokenGame.scala index dd5efe71..0303bcaf 100644 --- a/api/src/main/scala/io/kagera/api/colored/ColoredTokenGame.scala +++ b/api/src/main/scala/io/kagera/api/colored/ColoredTokenGame.scala @@ -3,13 +3,13 @@ package io.kagera.api.colored import io.kagera.api._ import io.kagera.api.multiset._ -trait ColoredTokenGame extends TokenGame[Place[_], Transition[_, _, _], Marking] { - this: ColoredPetriNet => +trait ColoredTokenGame[T] extends TokenGame[Place[_], T, Marking] { + this: ColoredPetriNet[T] => - override def enabledParameters(m: Marking): Map[Transition[_, _, _], Iterable[Marking]] = + override def enabledParameters(m: Marking): Map[T, Iterable[Marking]] = enabledTransitions(m).view.map(t => t -> consumableMarkings(m)(t)).toMap - def consumableMarkings(marking: Marking)(t: Transition[_, _, _]): Iterable[Marking] = { + def consumableMarkings(marking: Marking)(t: T): Iterable[Marking] = { // TODO this is not the most efficient, should break early when consumable tokens < edge weight val consumable = inMarking(t).map { case (place, count) => (place, count, consumableTokens(marking, place, t)) @@ -28,7 +28,7 @@ trait ColoredTokenGame extends TokenGame[Place[_], Transition[_, _, _], Marking] } } - def consumableTokens[C](marking: Marking, p: Place[C], t: Transition[_, _, _]): MultiSet[C] = { + def consumableTokens[C](marking: Marking, p: Place[C], t: T): MultiSet[C] = { val pn = this val edge = pn.getEdge(p, t).get @@ -40,6 +40,6 @@ trait ColoredTokenGame extends TokenGame[Place[_], Transition[_, _, _], Marking] } // TODO optimize, no need to process all transitions - override def enabledTransitions(marking: Marking): Set[Transition[_, _, _]] = + override def enabledTransitions(marking: Marking): Set[T] = transitions.filter(t => consumableMarkings(marking)(t).nonEmpty) } diff --git a/api/src/main/scala/io/kagera/api/colored/Generators.scala b/api/src/main/scala/io/kagera/api/colored/Generators.scala index 2c5ba9c7..7c4422bb 100644 --- a/api/src/main/scala/io/kagera/api/colored/Generators.scala +++ b/api/src/main/scala/io/kagera/api/colored/Generators.scala @@ -7,14 +7,14 @@ import scala.concurrent.ExecutionContext object Generators { object Uncolored { - def sequence(nrOfSteps: Int, automated: Boolean = false): ExecutablePetriNet[Unit] = { + def sequence(nrOfSteps: Int, automated: Boolean = false): ExecutablePetriNet[Unit, Transition[_, _, _]] = { val transitions = (1 to nrOfSteps).map(i => nullTransition(id = i, automated = automated)) val places = (1 to (nrOfSteps - 1)).map(i => Place[Unit](id = i)) val tpedges = transitions.zip(places).map { case (t, p) => arc(t, p, 1) } val ptedges = places.zip(transitions.tail).map { case (p, t) => arc(p, t, 1) } - process[Unit]((tpedges ++ ptedges): _*) + process((tpedges ++ ptedges): _*) } } } diff --git a/api/src/main/scala/io/kagera/api/colored/Transition.scala b/api/src/main/scala/io/kagera/api/colored/Transition.scala index 11a36138..09011c51 100644 --- a/api/src/main/scala/io/kagera/api/colored/Transition.scala +++ b/api/src/main/scala/io/kagera/api/colored/Transition.scala @@ -1,23 +1,23 @@ package io.kagera.api.colored -import cats.ApplicativeError -import cats.effect.Sync +import cats.{ Applicative, Functor } import io.kagera.api.colored.ExceptionStrategy.BlockTransition import io.kagera.api.multiset.MultiSet import scala.concurrent.duration.Duration +import scala.reflect.ClassTag /** * A transition in a Colored Petri Net * - * @tparam Input + * @tparam I * The input type of the transition, the type of value that is required as input - * @tparam Output + * @tparam O * The output type of the transition, the type of value that this transition 'emits' or 'produces' - * @tparam State + * @tparam S * The type of state the transition closes over. */ -trait Transition[Input, Output, State] { +trait Transition[-I, O, S] { /** * The unique identifier of this transition. @@ -59,36 +59,8 @@ trait Transition[Input, Output, State] { */ def exceptionStrategy: TransitionExceptionHandler = (e, n) => BlockTransition - /** - * Given the in and out adjacent places with their weight returns a function: - * - * (Mi, S, I) => (Mo, O) - * - * Where: - * - * Mi is the in-adjacent marking, the tokens this transition consumes. S is the context state. I is input data - * - * Mo is the out-adjacent marking, the tokens this transition produces. O is the emitted output - * - * TODO instead of requiring this on a Transition trait a Type-Class approach looks more flexible. - * - * For example some type T[_, _, _] we have T => TransitionFunction The same goes for other properties defined on this - * trait. - * - * This way we can forgo with the entire Transition trait and let user's use whatever they want. For example, in - * simple cases (uncolored place / transition nets) an identifier (Int or Long) as a type is enough. - * - * @param inAdjacent - * @param outAdjacent - * @return - */ - def apply[F[_]](inAdjacent: MultiSet[Place[_]], outAdjacent: MultiSet[Place[_]])(implicit - sync: Sync[F], - errorHandling: ApplicativeError[F, Throwable] - ): TransitionFunction[F, Input, Output, State] - /** * The state event sourcing function. */ - def updateState: State => Output => State + def updateState: S => O => S } diff --git a/api/src/main/scala/io/kagera/api/colored/TransitionExecutor.scala b/api/src/main/scala/io/kagera/api/colored/TransitionExecutor.scala index 8e5bfa8b..4a51ace1 100644 --- a/api/src/main/scala/io/kagera/api/colored/TransitionExecutor.scala +++ b/api/src/main/scala/io/kagera/api/colored/TransitionExecutor.scala @@ -1,46 +1,55 @@ package io.kagera.api.colored import cats.ApplicativeError -import cats.effect.{ IO, Sync } +import cats.syntax.applicativeError._ import io.kagera.api._ -trait TransitionExecutor[F[_], State] { +trait TransitionExecutor[F[_], T] { /** * Given a transition returns an input output function * * @param t - * @tparam Input - * @tparam Output * @return */ - def fireTransition[Input, Output](t: Transition[Input, Output, State]): TransitionFunction[F, Input, Output, State] + def fireTransition(t: T)(implicit + executorFactory: TransitionExecutorFactory[F, T] + ): TransitionFunctionF[F, executorFactory.Input, executorFactory.Output, executorFactory.State] } -class TransitionExecutorImpl[F[_], State](topology: ColoredPetriNet)(implicit - sync: Sync[F], +class TransitionExecutorImpl[F[_], T](topology: ColoredPetriNet[T])(implicit errorHandling: ApplicativeError[F, Throwable] -) extends TransitionExecutor[F, State] { +) extends TransitionExecutor[F, T] { - val cachedTransitionFunctions: Map[Transition[_, _, _], _] = - topology.transitions.map(t => t -> t.apply[F](topology.inMarking(t), topology.outMarking(t))).toMap + /* + TODO: Reintroduce caching + val cachedTransitionFunctions: Map[T, TransitionFunctionF[F, Input, Output, State]] = + topology.transitions.map(t => t -> executorFactory.createTransitionExecutor(t, topology.inMarking(t), topology.outMarking(t))).toMap - def transitionFunction[Input, Output](t: Transition[Input, Output, State]) = - cachedTransitionFunctions(t).asInstanceOf[TransitionFunction[F, Input, Output, State]] - - def fireTransition[Input, Output]( - t: Transition[Input, Output, State] - ): TransitionFunction[F, Input, Output, State] = { (consume, state, input) => - def handleFailure: PartialFunction[Throwable, F[(Marking, Output)]] = { case e: Throwable => - errorHandling.raiseError(e).asInstanceOf[F[(Marking, Output)]] - } - - if (consume.multiplicities != topology.inMarking(t)) { - errorHandling.raiseError(new IllegalArgumentException(s"Transition $t may not consume $consume")) - } + def transitionFunction(t: T): TransitionFunctionF[F, Input, Output, State] = cachedTransitionFunctions(t) + */ - try { - errorHandling.handleErrorWith(transitionFunction[Input, Output](t)(consume, state, input)) { handleFailure } - } catch { handleFailure } + def fireTransition(t: T)(implicit + executorFactory: TransitionExecutorFactory[F, T] + ): TransitionFunctionF[F, executorFactory.Input, executorFactory.Output, executorFactory.State] = { + (consume, state, input) => + def handleFailure: PartialFunction[Throwable, F[(Marking, executorFactory.Output)]] = { case e: Throwable => + errorHandling.raiseError(e).asInstanceOf[F[(Marking, executorFactory.Output)]] + } + + if (consume.multiplicities != topology.inMarking(t)) { + errorHandling.raiseError(new IllegalArgumentException(s"Transition $t may not consume $consume")) + } + + val transitionFunction + : TransitionFunctionF[F, executorFactory.Input, executorFactory.Output, executorFactory.State] = + executorFactory.createTransitionExecutor(t, topology.inMarking(t), topology.outMarking(t)) + try { + errorHandling.handleErrorWith(transitionFunction(consume, state, input)) { + handleFailure + } + } catch { + handleFailure + } } } diff --git a/api/src/main/scala/io/kagera/api/colored/TransitionExecutorFactory.scala b/api/src/main/scala/io/kagera/api/colored/TransitionExecutorFactory.scala new file mode 100644 index 00000000..71e63f4b --- /dev/null +++ b/api/src/main/scala/io/kagera/api/colored/TransitionExecutorFactory.scala @@ -0,0 +1,92 @@ +package io.kagera.api.colored + +import io.kagera.api.multiset.MultiSet + +import scala.reflect.ClassTag + +abstract class TransitionExecutorFactory[F[_], T : ClassTag] { + type Input + type Output + type State + + /** + * Given the in and out adjacent places with their weight returns a function: + * + * (Mi, S, I) => (Mo, O) + * + * Where: + * + * Mi is the in-adjacent marking, the tokens this transition consumes. S is the context state. I is input data + * + * Mo is the out-adjacent marking, the tokens this transition produces. O is the emitted output + * + * TODO instead of requiring this on a Transition trait a Type-Class approach looks more flexible. + * + * For example some type T[_, _, _] we have T => TransitionFunction The same goes for other properties defined on this + * trait. + * + * This way we can forgo with the entire Transition trait and let user's use whatever they want. For example, in + * simple cases (uncolored place / transition nets) an identifier (Int or Long) as a type is enough. + * + * @param inAdjacent + * @param outAdjacent + * @return + */ + def createTransitionExecutor( + t: T, + inAdjacent: MultiSet[Place[_]], + outAdjacent: MultiSet[Place[_]] + ): TransitionFunctionF[F, Input, Output, State] + + /** + * Choose the first succeeding decoder. + */ + final def or[T2 >: T : ClassTag, S]( + d: => TransitionExecutorFactory.WithInputOutputState[F, T2, Input, Output, S] + ): TransitionExecutorFactory.WithInputOutputState[F, T2, Input, Output, _] = new TransitionExecutorFactory[F, T2] { + self => + /** + * Given the in and out adjacent places with their weight returns a function: + * + * (Mi, S, I) => (Mo, O) + * + * Where: + * + * Mi is the in-adjacent marking, the tokens this transition consumes. S is the context state. I is input data + * + * Mo is the out-adjacent marking, the tokens this transition produces. O is the emitted output + * + * TODO instead of requiring this on a Transition trait a Type-Class approach looks more flexible. + * + * For example some type T[_, _, _] we have T => TransitionFunction The same goes for other properties defined on + * this trait. + * + * This way we can forgo with the entire Transition trait and let user's use whatever they want. For example, in + * simple cases (uncolored place / transition nets) an identifier (Int or Long) as a type is enough. + * + * @param inAdjacent + * @param outAdjacent + * @return + */ + override def createTransitionExecutor( + t: T2, + inAdjacent: MultiSet[Place[_]], + outAdjacent: MultiSet[Place[_]] + ): TransitionFunctionF[F, Input, Output, State] = t match { + case t: T => self.createTransitionExecutor(t, inAdjacent, outAdjacent) + case t: T2 => d.createTransitionExecutor(t, inAdjacent, outAdjacent) + case t => throw new RuntimeException(s"Don't know how to create a TransitionExecutor for $t") + } + } +} + +object TransitionExecutorFactory { + type WithState[F[_], T, S] = TransitionExecutorFactory[F, T] { + type State = S + } + type WithInputOutputState[F[_], T, I, O, S] = TransitionExecutorFactory[F, T] { + type Input >: I + type Output = O + type State = S + } +} diff --git a/api/src/main/scala/io/kagera/api/colored/dsl/SequenceNet.scala b/api/src/main/scala/io/kagera/api/colored/dsl/SequenceNet.scala index be6e486b..b4e53b69 100644 --- a/api/src/main/scala/io/kagera/api/colored/dsl/SequenceNet.scala +++ b/api/src/main/scala/io/kagera/api/colored/dsl/SequenceNet.scala @@ -1,27 +1,47 @@ package io.kagera.api.colored.dsl -import cats.effect.Sync +import cats.Applicative +import cats.effect.IO import io.kagera.api.colored.ExceptionStrategy.BlockTransition import io.kagera.api.colored._ -import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition } +import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransitionExecutorFactory } import scala.concurrent.duration.Duration case class TransitionBehaviour[S, E](automated: Boolean, exceptionHandler: TransitionExceptionHandler, fn: S => E) { def asTransition(id: Long, eventSource: S => E => S) = - new AbstractTransition[Unit, E, S](id, s"t$id", automated, Duration.Undefined, exceptionHandler) - with UncoloredTransition[Unit, E, S] { - override val toString = label - override val updateState = eventSource - override def produceEvent[F[_] : Sync](consume: Marking, state: S, input: Unit): F[E] = Sync.apply.delay { - (fn(state)) - } - } + new SequenceNetTransition[S, E](id, automated, exceptionHandler, eventSource, fn) +} + +class SequenceNetTransition[S, E]( + id: Long, + automated: Boolean, + exceptionHandler: TransitionExceptionHandler, + eventSource: S => E => S, + val fn: S => E +) extends AbstractTransition[Any, E, S](id, s"t$id", automated, Duration.Undefined, exceptionHandler) { + override val toString = label + override val updateState = eventSource +} + +class SequenceNetTransitionExecutorFactory[F[_] : Applicative, S, E] + extends UncoloredTransitionExecutorFactory[F, SequenceNetTransition[S, E]] { + type Input = Any + type Output = E + type State = S + override def produceEvent(t: SequenceNetTransition[S, E], consume: Marking, state: S, input: Any): F[E] = + Applicative.apply.pure(t.fn(state)) +} + +object SequenceNetTransitionExecutorFactory { + implicit def sequenceNetTransitionExecutorFactory[F[_] : Applicative, S, E] + : TransitionExecutorFactory[F, SequenceNetTransition[S, E]] = new SequenceNetTransitionExecutorFactory[F, S, E] } trait SequenceNet[S, E] { def sequence: Seq[TransitionBehaviour[S, E]] + def eventSourcing: S => E => S lazy val places = (1 to (sequence.size + 1)).map(i => Place[Unit](id = i)) @@ -29,6 +49,7 @@ trait SequenceNet[S, E] { lazy val initialMarking = Marking(place(1) -> 1) def place(n: Int) = places(n - 1) + def transition(automated: Boolean = false, exceptionHandler: TransitionExceptionHandler = (e, n) => BlockTransition)( fn: S => E ): TransitionBehaviour[S, E] = TransitionBehaviour(automated, exceptionHandler, fn) @@ -40,6 +61,6 @@ trait SequenceNet[S, E] { val places = (1 to (nrOfSteps + 1)).map(i => Place[Unit](id = i)) val tpedges = transitions.zip(places.tail).map { case (t, p) => arc(t, p, 1) } val ptedges = places.zip(transitions).map { case (p, t) => arc(p, t, 1) } - process[S]((tpedges ++ ptedges): _*) + process[S, SequenceNetTransition[S, E]]((tpedges ++ ptedges): _*) } } diff --git a/api/src/main/scala/io/kagera/api/colored/dsl/StateTransitionNet.scala b/api/src/main/scala/io/kagera/api/colored/dsl/StateTransitionNet.scala index 1889b4f4..fb0f7591 100644 --- a/api/src/main/scala/io/kagera/api/colored/dsl/StateTransitionNet.scala +++ b/api/src/main/scala/io/kagera/api/colored/dsl/StateTransitionNet.scala @@ -1,8 +1,9 @@ package io.kagera.api.colored.dsl -import cats.effect.Sync +import cats.Applicative +import cats.effect.IO import io.kagera.api.colored.ExceptionStrategy.BlockTransition -import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition } +import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransitionExecutorFactory } import io.kagera.api.colored.{ Marking, Transition, _ } import scala.concurrent.duration.Duration @@ -17,14 +18,36 @@ trait StateTransitionNet[S, E] { label: Option[String] = None, automated: Boolean = false, exceptionStrategy: TransitionExceptionHandler = (e, n) => BlockTransition - )(fn: S => E): Transition[Unit, E, S] = - new AbstractTransition[Unit, E, S](id, label.getOrElse(s"t$id"), automated, Duration.Undefined, exceptionStrategy) - with UncoloredTransition[Unit, E, S] { - override val toString = label - override val updateState = eventSourcing - override def produceEvent[F[_]](consume: Marking, state: S, input: Unit)(implicit sync: Sync[F]): F[E] = - Sync.apply.delay { (fn(state)) } - } + )(fn: S => E): StateTransitionNetTransition[S, E] = + new StateTransitionNetTransition[S, E]( + id, + label.getOrElse(s"t$id"), + automated, + exceptionStrategy, + eventSourcing, + fn + ) + def createPetriNet(arcs: Arc[StateTransitionNetTransition[S, E]]*) = + process[S, StateTransitionNetTransition[S, E]](arcs: _*) +} - def createPetriNet(arcs: Arc*) = process[S](arcs: _*) +class StateTransitionNetTransition[S, E]( + id: Long, + label: String, + automated: Boolean, + exceptionStrategy: TransitionExceptionHandler, + val eventSourcing: S => E => S, + val transitionFunction: S => E +) extends AbstractTransition[Unit, E, S](id, label, automated, Duration.Undefined, exceptionStrategy) + with Transition[Unit, E, S] { + override val toString = label + override val updateState = eventSourcing +} +class StateTransitionNetExecutorFactory[S, E] + extends UncoloredTransitionExecutorFactory[cats.Id, StateTransitionNetTransition[S, E]] { + type Input = Unit + type Output = E + type State = S + override def produceEvent(t: StateTransitionNetTransition[S, E], consume: Marking, state: S, input: Unit): E = + t.transitionFunction(state) } diff --git a/api/src/main/scala/io/kagera/api/colored/dsl/package.scala b/api/src/main/scala/io/kagera/api/colored/dsl/package.scala index 7b803d87..8bbaa4bd 100644 --- a/api/src/main/scala/io/kagera/api/colored/dsl/package.scala +++ b/api/src/main/scala/io/kagera/api/colored/dsl/package.scala @@ -1,6 +1,5 @@ package io.kagera.api.colored -import cats.ApplicativeError import cats.effect.Sync import io.kagera.api._ import io.kagera.api.colored.transitions.{ AbstractTransition, IdentityTransition } @@ -22,48 +21,60 @@ import scala.concurrent.duration.Duration */ package object dsl { - implicit class TransitionDSL[Input, Output, State](t: Transition[Input, Output, State]) { - def ~>(p: Place[_], weight: Long = 1): Arc = arc(t, p, weight) + implicit class TransitionDSL[T <: Transition[_, _, _]](t: T) { + def ~>(p: Place[_], weight: Long = 1): Arc[T] = arc(t, p, weight) } implicit class PlaceDSL[C](p: Place[C]) { - def ~>(t: Transition[_, _, _], weight: Long = 1, filter: C => Boolean = token => true): Arc = - arc(p, t, weight, filter) + def ~>[T](t: T, weight: Long = 1, filter: C => Boolean = token => true): Arc[T] = arc(p, t, weight, filter) } - def arc(t: Transition[_, _, _], p: Place[_], weight: Long): Arc = - WLDiEdge[Node, String](Right(t), Left(p))(weight, "") + def arc[T](t: T, p: Place[_], weight: Long): Arc[T] = WLDiEdge[Node[T], String](Right(t), Left(p))(weight, "") - def arc[C](p: Place[C], t: Transition[_, _, _], weight: Long, filter: C => Boolean = (token: C) => true): Arc = { + def arc[C, T](p: Place[C], t: T, weight: Long, filter: C => Boolean = (token: C) => true): Arc[T] = { val innerEdge = new PTEdgeImpl[C](weight, filter) - WLDiEdge[Node, PTEdge[C]](Left(p), Right(t))(weight, innerEdge) + WLDiEdge[Node[T], PTEdge[C]](Left(p), Right(t))(weight, innerEdge) } - def constantTransition[I, O, S](id: Long, label: Option[String] = None, automated: Boolean = false, constant: O) = - new AbstractTransition[I, O, S](id, label.getOrElse(s"t$id"), automated, Duration.Undefined) + class ConstantTransition[I, O, S](id: Long, label: String, automated: Boolean, val constant: O) + extends AbstractTransition[I, O, S](id, label, automated, Duration.Undefined) with IdentityTransition[I, O, S] { - override val toString = label + override val toString = label + } + def constantTransition[I, O, S](id: Long, label: Option[String] = None, automated: Boolean = false, constant: O) = + new ConstantTransition[I, O, S](id, label.getOrElse(s"t$id"), automated, constant) - override def apply[F[_]](inAdjacent: MultiSet[Place[_]], outAdjacent: MultiSet[Place[_]])(implicit - sync: Sync[F], - applicativeError: ApplicativeError[F, Throwable] - ) = - (marking, state, input) => - sync.delay { - val produced = outAdjacent.map { case (place, weight) => - place -> Map(constant -> weight) - }.toMarking + class ConstantTransitionExecutorFactory[F[_] : Sync, I, O, S] + extends TransitionExecutorFactory[F, ConstantTransition[I, O, S]] { + type Input = I + type Output = O + type State = S - (produced, constant) - } - } + override def createTransitionExecutor( + t: ConstantTransition[I, O, S], + inAdjacent: MultiSet[Place[_]], + outAdjacent: MultiSet[Place[_]] + ): TransitionFunctionF[F, I, O, S] = + (marking, state, input) => + Sync.apply.delay { + val produced = outAdjacent.map { case (place, weight) => + place -> Map(t.constant -> weight) + }.toMarking + + (produced, t.constant) + } + } - def nullTransition[S](id: Long, label: Option[String] = None, automated: Boolean = false): Transition[Unit, Unit, S] = + def nullTransition[S]( + id: Long, + label: Option[String] = None, + automated: Boolean = false + ): ConstantTransition[Unit, Unit, S] = constantTransition[Unit, Unit, S](id, label, automated, ()) - def process[S](params: Arc*): ExecutablePetriNet[S] = { - val petriNet = new ScalaGraphPetriNet(Graph(params: _*)) with ColoredTokenGame + def process[S, T <: Transition[_, _, _]](params: Arc[T]*): ExecutablePetriNet[S, T] = { + val petriNet = new ScalaGraphPetriNet[Place[_], T](Graph(params: _*)) with ColoredTokenGame[T] requireUniqueElements(petriNet.places.toSeq.map(_.id), "Place identifier") requireUniqueElements(petriNet.transitions.toSeq.map(_.id), "Transition identifier") diff --git a/api/src/main/scala/io/kagera/api/colored/package.scala b/api/src/main/scala/io/kagera/api/colored/package.scala index 4e53bd56..7311eb3f 100644 --- a/api/src/main/scala/io/kagera/api/colored/package.scala +++ b/api/src/main/scala/io/kagera/api/colored/package.scala @@ -1,22 +1,25 @@ package io.kagera.api +import cats.effect.IO import io.kagera.api.multiset._ + +import scala.language.existentials import scala.collection.compat._ +import scala.language.higherKinds +import scalax.collection.Graph import scalax.collection.edge.WLDiEdge -import scala.language.{ existentials, higherKinds } - package object colored { /** * Type alias for the node type of the scalax.collection.Graph backing the petri net. */ - type Node = Either[Place[_], Transition[_, _, _]] + type Node[+T] = Either[Place[_], T] /** * Type alias for the edge type of the scalax.collection.Graph backing the petri net. */ - type Arc = WLDiEdge[Node] + type Arc[+T] = WLDiEdge[Node[T]] /** * Type alias for a single marked place, meaning a place containing tokens. @@ -36,7 +39,7 @@ package object colored { * @tparam State * The state the transition closes over. */ - type TransitionFunction[F[_], Input, Output, State] = (Marking, State, Input) => F[(Marking, Output)] + type TransitionFunctionF[F[_], Input, Output, State] = (Marking, State, Input) => F[(Marking, Output)] /** * An exception handler function associated with a transition. @@ -46,7 +49,7 @@ package object colored { /** * Type alias for a colored petri net. */ - type ColoredPetriNet = PetriNet[Place[_], Transition[_, _, _]] + type ColoredPetriNet[T] = PetriNet[Place[_], T] /** * Type alias for a marking. @@ -102,7 +105,7 @@ package object colored { * @tparam S * The 'global' state transitions close over */ - type ExecutablePetriNet[S] = ColoredPetriNet with ColoredTokenGame + type ExecutablePetriNet[S, T] = ColoredPetriNet[T] with ColoredTokenGame[T] implicit def toMarkedPlace(tuple: (Place[Unit], Int)): MarkedPlace[Unit] = tuple._1 -> Map[Unit, Int](() -> tuple._2) @@ -110,8 +113,8 @@ package object colored { def toMarking: Marking = HMap[Place, MultiSet](i.toMap[Place[_], MultiSet[_]]) } - implicit class ColoredPetriNetAdditions(petriNet: ColoredPetriNet) { - def getEdge(p: Place[_], t: Transition[_, _, _]): Option[PTEdge[Any]] = + implicit class ColoredPetriNetAdditions[T](petriNet: ColoredPetriNet[T]) { + def getEdge(p: Place[_], t: T): Option[PTEdge[Any]] = petriNet.innerGraph.findPTEdge(p, t).map(_.label.asInstanceOf[PTEdge[Any]]) } diff --git a/api/src/main/scala/io/kagera/api/colored/transitions/AbstractTransition.scala b/api/src/main/scala/io/kagera/api/colored/transitions/AbstractTransition.scala index f6f81505..2cd1d3bd 100644 --- a/api/src/main/scala/io/kagera/api/colored/transitions/AbstractTransition.scala +++ b/api/src/main/scala/io/kagera/api/colored/transitions/AbstractTransition.scala @@ -5,7 +5,7 @@ import io.kagera.api.colored.{ Transition, _ } import scala.concurrent.duration.Duration -abstract class AbstractTransition[I, O, S]( +abstract class AbstractTransition[-I, O, S]( override val id: Long, override val label: String, override val isAutomated: Boolean, diff --git a/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransition.scala b/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransition.scala deleted file mode 100644 index cfdf2b62..00000000 --- a/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransition.scala +++ /dev/null @@ -1,22 +0,0 @@ -package io.kagera.api.colored.transitions - -import cats.ApplicativeError -import cats.effect.Sync -import io.kagera.api.colored._ -import io.kagera.api.multiset.MultiSet - -trait UncoloredTransition[Input, Output, State] extends Transition[Input, Output, State] { - - override def apply[F[_]](inAdjacent: MultiSet[Place[_]], outAdjacent: MultiSet[Place[_]])(implicit - sync: Sync[F], - errorHandling: ApplicativeError[F, Throwable] - ) = { (consume, state, input) => - { - // assumes uncolored outgoing places (Place[Unit]) - val produce = outAdjacent.map { case (p, count) => p -> Map(() -> count) }.toMarking - sync.map(produceEvent[F](consume, state, input))(output => (produce, output)) - } - } - - def produceEvent[F[_] : Sync](consume: Marking, state: State, input: Input): F[Output] -} diff --git a/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransitionExecutorFactory.scala b/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransitionExecutorFactory.scala new file mode 100644 index 00000000..c101bbb0 --- /dev/null +++ b/api/src/main/scala/io/kagera/api/colored/transitions/UncoloredTransitionExecutorFactory.scala @@ -0,0 +1,23 @@ +package io.kagera.api.colored.transitions + +import cats.Functor +import cats.syntax.functor._ +import io.kagera.api.colored._ +import io.kagera.api.multiset.MultiSet + +abstract class UncoloredTransitionExecutorFactory[F[_] : Functor, T] extends TransitionExecutorFactory[F, T] { + + override def createTransitionExecutor( + t: T, + inAdjacent: MultiSet[Place[_]], + outAdjacent: MultiSet[Place[_]] + ): TransitionFunctionF[F, Input, Output, State] = { (consume, state, input) => + { + // assumes uncolored outgoing places (Place[Unit]) + val produce = outAdjacent.map { case (p, count) => p -> Map(() -> count) }.toMarking + produceEvent(t, consume, state, input).map(output => (produce, output)) + } + } + + def produceEvent(t: T, consume: Marking, state: State, input: Input): F[Output] +} diff --git a/api/src/main/scala/io/kagera/api/package.scala b/api/src/main/scala/io/kagera/api/package.scala index 9acc3c3a..7b7dd642 100644 --- a/api/src/main/scala/io/kagera/api/package.scala +++ b/api/src/main/scala/io/kagera/api/package.scala @@ -1,5 +1,6 @@ package io.kagera +import io.kagera.api.colored.{ Place, Transition } import io.kagera.api.multiset.MultiSet import scala.PartialFunction._ @@ -13,7 +14,11 @@ package object api { case class Id(value: Long) extends AnyVal case class Label(value: String) extends AnyVal - type Identifiable[T] = T => Id + trait Identifiable[T] { + def identify(t: T): Id + } + implicit def identifiableTransition[T <: Transition[_, _, _]]: Identifiable[T] = t => Id(t.id) + implicit def identifiablePlace[P <: Place[_]]: Identifiable[P] = p => Id(p.id) type Labeled[T] = T => Label implicit class LabeledFn[T : Labeled](seq: Iterable[T]) { @@ -24,7 +29,7 @@ package object api { } implicit class IdFn[T : Identifiable](seq: Iterable[T]) { - def findById(id: Long): Option[T] = seq.find(e => implicitly[Identifiable[T]].apply(e).value == id) + def findById(id: Long): Option[T] = seq.find(e => implicitly[Identifiable[T]].identify(e).value == id) def getById(id: Long): T = findById(id).getOrElse { throw new IllegalStateException(s"No element found with id: $id") } diff --git a/api/src/test/scala/io/kagera/api/colored/TokenGameSpec.scala b/api/src/test/scala/io/kagera/api/colored/TokenGameSpec.scala index 0b3d2e76..c9dd747b 100644 --- a/api/src/test/scala/io/kagera/api/colored/TokenGameSpec.scala +++ b/api/src/test/scala/io/kagera/api/colored/TokenGameSpec.scala @@ -12,7 +12,12 @@ class TokenGameSpec extends AnyWordSpec { val t1 = constantTransition[Int, Int, Unit](id = 1, automated = false, constant = 42) val t2 = constantTransition[Int, Int, Unit](id = 2, automated = false, constant = 5) - val testProcess = process(p1 ~> (t1, filter = _ > 42), p1 ~> (t2, weight = 3), t1 ~> p2, t2 ~> p2) + val testProcess = process( + p1 ~> (t1, filter = _ > 42), + p1 ~> (t2, weight = 3), + TransitionDSL[ConstantTransition[Int, Int, Unit]](t1) ~> p2, + TransitionDSL[ConstantTransition[Int, Int, Unit]](t2) ~> p2 + ) "The Colored Token game" should { diff --git a/demo/jvm/src/main/scala/io/kagera/demo/Queries.scala b/demo/jvm/src/main/scala/io/kagera/demo/Queries.scala index 40b8c149..65284718 100644 --- a/demo/jvm/src/main/scala/io/kagera/demo/Queries.scala +++ b/demo/jvm/src/main/scala/io/kagera/demo/Queries.scala @@ -26,7 +26,7 @@ trait Queries { } -class AggregateMarking[S](topology: ExecutablePetriNet[S]) extends Actor { +class AggregateMarking[S](topology: ExecutablePetriNet[S, _]) extends Actor { override def receive: Receive = updateMarking(MultiSet.empty) diff --git a/demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala b/demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala index 5488d058..04e5a61a 100644 --- a/demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala +++ b/demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala @@ -10,7 +10,7 @@ import akka.util.{ ByteString, Timeout } import de.heikoseeberger.akkahttpupickle.UpickleSupport import io.kagera.akka.actor.PetriNetInstance import io.kagera.akka.actor.PetriNetInstanceProtocol._ -import io.kagera.api.colored.{ ExecutablePetriNet, Generators, Marking } +import io.kagera.api.colored.{ ExecutablePetriNet, Generators, Marking, Transition } import io.kagera.demo.{ ConfiguredActorSystem, Queries } import upickle.default._ @@ -32,7 +32,9 @@ trait Routes extends Directives with Queries with UpickleSupport { ) } - val repository: Map[String, ExecutablePetriNet[_]] = Map("test" -> Generators.Uncolored.sequence(5)) + val repository: Map[String, ExecutablePetriNet[_, Transition[_, _, _]]] = Map( + "test" -> Generators.Uncolored.sequence(5) + ) val indexRoute = path("index.html") { get { @@ -50,7 +52,7 @@ trait Routes extends Directives with Queries with UpickleSupport { get { repository.get(id) match { case None => complete(StatusCodes.NotFound -> s"no such process: $id") - case Some(process) => complete(upickle.default.write(Util.toModel(process))) + case Some(process) => complete(upickle.default.write(Util.toModel[Transition[_, _, _]](process))) } } } @@ -69,9 +71,9 @@ trait Routes extends Directives with Queries with UpickleSupport { } ~ path("create" / Segment) { topologyId => post { - val topology = repository(topologyId).asInstanceOf[ExecutablePetriNet[Unit]] + val topology = repository(topologyId).asInstanceOf[ExecutablePetriNet[Unit, _]] val id = java.util.UUID.randomUUID.toString -// system.actorOf(PetriNetInstance.props(topology, Marking.empty, ()), id) + // system.actorOf(PetriNetInstance.props(topology, Marking.empty, ()), id) complete(id) } } ~ diff --git a/demo/jvm/src/main/scala/io/kagera/demo/http/Util.scala b/demo/jvm/src/main/scala/io/kagera/demo/http/Util.scala index 1b21f653..b9d11355 100644 --- a/demo/jvm/src/main/scala/io/kagera/demo/http/Util.scala +++ b/demo/jvm/src/main/scala/io/kagera/demo/http/Util.scala @@ -4,7 +4,7 @@ import io.kagera.api.colored.{ Place, Transition, _ } import io.kagera.demo.model object Util { - def toModel(pn: ExecutablePetriNet[_]): model.PetriNetModel = { + def toModel[T <: Transition[_, _, _]](pn: ExecutablePetriNet[_, T]): model.PetriNetModel = { val places = pn.nodes.collect { case Left(p) => model.Place(p.id, p.label) diff --git a/execution/src/main/scala/io/kagera/execution/EventSourcing.scala b/execution/src/main/scala/io/kagera/execution/EventSourcing.scala index 3a6e5be8..d3477c58 100644 --- a/execution/src/main/scala/io/kagera/execution/EventSourcing.scala +++ b/execution/src/main/scala/io/kagera/execution/EventSourcing.scala @@ -44,12 +44,12 @@ object EventSourcing { */ case class InitializedEvent(marking: Marking, state: Any) extends Event - def applyEvent[S](e: Event): State[Instance[S], Unit] = State.modify { instance => + def applyEvent[S, T <: Transition[_, _, S]](e: Event): State[Instance[S, T], Unit] = State.modify { instance => e match { case InitializedEvent(initialMarking, initialState) => - Instance[S](instance.process, 1, initialMarking, initialState.asInstanceOf[S], Map.empty) + Instance[S, T](instance.process, 1, initialMarking, initialState.asInstanceOf[S], Map.empty) case e: TransitionFiredEvent => - val t = instance.transitionById(e.transitionId).get.asInstanceOf[Transition[_, Any, S]] + val t = instance.process.transitions.getById(e.transitionId).asInstanceOf[Transition[_, Any, S]] val newState = e.output.map(t.updateState(instance.state)).getOrElse(instance.state) instance.copy( @@ -59,17 +59,10 @@ object EventSourcing { jobs = instance.jobs - e.jobId ) case e: TransitionFailedEvent => - val job = instance.jobs.getOrElse( - e.jobId, - Job[S, Any]( - e.jobId, - instance.state, - instance.transitionById(e.transitionId).asInstanceOf[Transition[Any, Any, S]], - e.consume, - e.input, - None - ) - ) + val job = instance.jobs.get(e.jobId).getOrElse { + val transition = instance.process.transitions.getById(e.transitionId) + Job[S, T](e.jobId, instance.state, transition, e.consume, e.input, None) + } val failureCount = job.failureCount + 1 val updatedJob = job.copy(failure = Some(ExceptionState(e.transitionId, failureCount, e.failureReason, e.exceptionStrategy))) diff --git a/execution/src/main/scala/io/kagera/execution/Instance.scala b/execution/src/main/scala/io/kagera/execution/Instance.scala index b0622a97..a3b47936 100644 --- a/execution/src/main/scala/io/kagera/execution/Instance.scala +++ b/execution/src/main/scala/io/kagera/execution/Instance.scala @@ -8,16 +8,16 @@ import scala.collection.{ Iterable, Map } import scala.util.Random object Instance { - def uninitialized[S](process: ExecutablePetriNet[S]): Instance[S] = - Instance[S](process, 0, Marking.empty, null.asInstanceOf[S], Map.empty) + def uninitialized[S, T <: Transition[_, _, S]](process: ExecutablePetriNet[S, T]): Instance[S, T] = + Instance[S, T](process, 0, Marking.empty, null.asInstanceOf[S], Map.empty) } -case class Instance[S]( - process: ExecutablePetriNet[S], +case class Instance[S, T <: Transition[_, _, S]]( + process: ExecutablePetriNet[S, T], sequenceNr: Long, marking: Marking, state: S, - jobs: Map[Long, Job[S, _]] + jobs: Map[Long, Job[S, T]] ) { def enabledParameters: Map[Transition[_, _, _], Iterable[Marking]] = process.enabledParameters(availableMarking) def enabledTransitions: Set[Transition[_, _, _]] = process.enabledTransitions(marking) @@ -29,7 +29,7 @@ case class Instance[S]( // The marking that is available for new jobs lazy val availableMarking: Marking = marking |-| reservedMarking - def activeJobs: Iterable[Job[S, _]] = jobs.values.filter(_.isActive) + def activeJobs: Iterable[Job[S, T]] = jobs.values.filter(_.isActive) def failedJobs: Iterable[ExceptionState] = jobs.values.map(_.failure).flatten diff --git a/execution/src/main/scala/io/kagera/execution/Job.scala b/execution/src/main/scala/io/kagera/execution/Job.scala index 4bb32b9d..261a1929 100644 --- a/execution/src/main/scala/io/kagera/execution/Job.scala +++ b/execution/src/main/scala/io/kagera/execution/Job.scala @@ -6,10 +6,10 @@ import io.kagera.api.colored.{ Marking, Transition, _ } /** * A Job describes all the parameters that make a firing transition in a petri net. */ -case class Job[S, E]( +case class Job[S, T <: Transition[_, _, S]]( id: Long, processState: S, - transition: Transition[Any, E, S], + transition: T, consume: Marking, input: Any, failure: Option[ExceptionState] = None diff --git a/execution/src/main/scala/io/kagera/execution/package.scala b/execution/src/main/scala/io/kagera/execution/package.scala index c9ff3106..eafc2d63 100644 --- a/execution/src/main/scala/io/kagera/execution/package.scala +++ b/execution/src/main/scala/io/kagera/execution/package.scala @@ -4,10 +4,9 @@ import java.io.{ PrintWriter, StringWriter } import cats.data.State import cats.effect.IO -import execution.EventSourcing.TransitionEvent import io.kagera.api._ import io.kagera.api.colored._ -import execution.EventSourcing._ +import io.kagera.execution.EventSourcing._ import scala.collection.Set import scala.concurrent.ExecutionContext @@ -17,20 +16,20 @@ package object execution { /** * Fires a specific transition with input, computes the marking it should consume */ - def fireTransition[S, E]( - transition: Transition[Any, E, S], + def fireTransition[S, T <: Transition[Any, _, S]]( + transition: T, input: Any - ): State[Instance[S], Either[String, Job[S, E]]] = + ): State[Instance[S, T], Either[String, Job[S, T]]] = State { instance => instance.isBlockedReason(transition.id) match { case Some(reason) => (instance, Left(reason)) case None => - instance.enabledParameters.get(transition) match { + instance.process.enabledParameters(instance.availableMarking).get(transition) match { case None => (instance, Left(s"Not enough consumable tokens")) case Some(params) => - val (updatedState, job) = createJob(transition, params.head, input)(instance) + val (updatedState, job) = createJob[S, T](transition, params.head, input)(instance) (updatedState, Right(job)) } } @@ -39,12 +38,12 @@ package object execution { /** * Creates a job for a specific input & marking. Does not do any validation on the parameters */ - def createJob[E, S]( - transition: Transition[Any, E, S], + def createJob[S, T <: Transition[Any, _, S]]( + transition: T, consume: Marking, input: Any - ): Instance[S] => (Instance[S], Job[S, E]) = s => { - val job = Job[S, E](s.nextJobId(), s.state, transition, consume, input) + ): Instance[S, T] => (Instance[S, T], Job[S, T]) = s => { + val job = Job[S, T](s.nextJobId(), s.state, transition, consume, input) val newState = s.copy(jobs = s.jobs + (job.id -> job)) (newState, job) } @@ -52,49 +51,53 @@ package object execution { /** * Finds the (optional) first transition that is automated & enabled */ - def fireFirstEnabled[S]: State[Instance[S], Option[Job[S, _]]] = State { instance => - instance.enabledParameters + def fireFirstEnabled[S, T <: Transition[Any, _, S]]: State[Instance[S, T], Option[Job[S, T]]] = State { instance => + instance.process + .enabledParameters(instance.availableMarking) .find { case (t, markings) => t.isAutomated && !instance.isBlockedReason(t.id).isDefined } .map { case (t, markings) => - val job = - Job[S, Any](instance.nextJobId(), instance.state, t.asInstanceOf[Transition[Any, Any, S]], markings.head, ()) + val job = Job[S, T](instance.nextJobId(), instance.state, t, markings.head, ()) (instance.copy(jobs = instance.jobs + (job.id -> job)), Some(job)) } .getOrElse((instance, None)) } - def fireTransitionById[S](id: Long, input: Any): State[Instance[S], Either[String, Job[S, Any]]] = + def fireTransitionById[S, T <: Transition[Any, _, S]]( + id: Long, + input: Any + ): State[Instance[S, T], Either[String, Job[S, T]]] = State - .inspect[Instance[S], Option[Transition[Any, Any, S]]]( - _.transitionById(id).map(_.asInstanceOf[Transition[Any, Any, S]]) - ) + .inspect[Instance[S, T], Option[T]] { instance => + instance.process.transitions.findById(id) + } .flatMap { case None => State.pure(Left(s"No transition exists with id $id")) - case Some(t) => fireTransition(t, input) + case Some(t) => fireTransition[S, T](t, input) } /** * Finds all automated enabled transitions. */ - def fireAllEnabledTransitions[S]: State[Instance[S], Set[Job[S, _]]] = - fireFirstEnabled[S].flatMap { + def fireAllEnabledTransitions[S, T <: Transition[Any, _, S]]: State[Instance[S, T], Set[Job[S, T]]] = + fireFirstEnabled[S, T].flatMap { case None => State.pure(Set.empty) - case Some(job) => fireAllEnabledTransitions[S].map(_ + job) + case Some(job) => fireAllEnabledTransitions[S, T].map(_ + job) } /** * Executes a job returning a Task[TransitionEvent] */ - def runJobAsync[S, E](job: Job[S, E], executor: TransitionExecutor[IO, S])(implicit - S: ExecutionContext + def runJobAsync[S, T <: Transition[_, _, S]](job: Job[S, T], executor: TransitionExecutor[IO, T])(implicit + S: ExecutionContext, + executorFactory: TransitionExecutorFactory.WithInputOutputState[IO, T, Any, _, S] ): IO[TransitionEvent] = { val startTime = System.currentTimeMillis() - val transitionFunction: TransitionFunction[IO, Any, E, S] = executor.fireTransition(job.transition) - val transitionApplied: IO[(Marking, E)] = transitionFunction(job.consume, job.processState, job.input) - transitionApplied + executor + .fireTransition(job.transition) + .apply(job.consume, job.processState, job.input) .map { case (produced, out) => TransitionFiredEvent( job.id, diff --git a/visualization/src/main/scala/io/kagera/dot/GraphTheme.scala b/visualization/src/main/scala/io/kagera/dot/GraphTheme.scala index 81817276..ae4c7b72 100644 --- a/visualization/src/main/scala/io/kagera/dot/GraphTheme.scala +++ b/visualization/src/main/scala/io/kagera/dot/GraphTheme.scala @@ -15,5 +15,4 @@ trait GraphTheme[N, E[X] <: EdgeLikeIn[X]] { def rootAttrs: scala.Seq[DotAttr] = List.empty -// def edgeDotAttrFn: Graph[N, E#EdgeT => List[DotAttr] = edge => List.empty }