Skip to content

Commit

Permalink
Merge pull request #3 from nightscape/massive_updates
Browse files Browse the repository at this point in the history
Massive dependency updates
  • Loading branch information
nikolakasev authored May 24, 2020
2 parents c28f48e + e510e52 commit 5cbf303
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.kagera.akka.actor
import akka.actor.{ ActorLogging, ActorRef, PoisonPill, Props }
import akka.pattern.pipe
import akka.persistence.PersistentActor
import fs2.Strategy
import io.kagera.akka.actor.PetriNetInstance.Settings
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.ExceptionStrategy.RetryWithDelay
Expand All @@ -12,17 +11,18 @@ import io.kagera.api.colored._
import io.kagera.execution.EventSourcing._
import io.kagera.execution._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.existentials

object PetriNetInstance {

case class Settings(
evaluationStrategy: Strategy,
evaluationStrategy: ExecutionContext,
idleTTL: Option[FiniteDuration])

val defaultSettings: Settings = Settings(
evaluationStrategy = Strategy.fromCachedDaemonPool("Kagera.CachedThreadPool"),
evaluationStrategy = ExecutionContext.Implicits.global,
idleTTL = Some(5 minutes)
)

Expand Down Expand Up @@ -152,7 +152,7 @@ class PetriNetInstance[S](
}

def executeJob[E](job: Job[S, E], originalSender: ActorRef) =
runJobAsync(job, executor)(settings.evaluationStrategy).unsafeRunAsyncFuture().pipeTo(context.self)(originalSender)
runJobAsync(job, executor)(settings.evaluationStrategy).unsafeToFuture().pipeTo(context.self)(originalSender)

override def onRecoveryCompleted(instance: Instance[S]) = step(instance)
}
23 changes: 11 additions & 12 deletions akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.pattern.ask
import akka.stream.scaladsl.{ Sink, Source, SourceQueueWithComplete }
import akka.stream.{ Materializer, OverflowStrategy }
import akka.util.Timeout
import cats.data.Xor
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.ExceptionStrategy.RetryWithDelay
import io.kagera.api.colored.{ Transition, _ }
Expand Down Expand Up @@ -79,29 +78,29 @@ class PetriNetInstanceApi[S](topology: ExecutablePetriNet[S], actor: ActorRef)(i
/**
* Fires a transition and confirms (waits) for the result of that transition firing.
*/
def askAndConfirmFirst(msg: Any)(implicit timeout: Timeout): Future[Xor[UnexpectedMessage, InstanceState[S]]] = {
def askAndConfirmFirst(msg: Any)(implicit timeout: Timeout): Future[Either[UnexpectedMessage, InstanceState[S]]] = {
actor.ask(msg).map {
case e: TransitionFired[_] Xor.Right(e.result.asInstanceOf[InstanceState[S]])
case msg @ _ Xor.Left(UnexpectedMessage(s"Received unexepected message: $msg"))
case e: TransitionFired[_] Right(e.result.asInstanceOf[InstanceState[S]])
case msg @ _ Left(UnexpectedMessage(s"Received unexepected message: $msg"))
}
}

def askAndConfirmFirstSync(msg: Any)(implicit timeout: Timeout): Xor[UnexpectedMessage, InstanceState[S]] = {
def askAndConfirmFirstSync(msg: Any)(implicit timeout: Timeout): Either[UnexpectedMessage, InstanceState[S]] = {
Await.result(askAndConfirmFirst(topology, msg), timeout.duration)
}

/**
* Fires a transition and confirms (waits) for all responses of subsequent automated transitions.
*/
def askAndConfirmAll(msg: Any, waitForRetries: Boolean = false)(implicit timeout: Timeout): Future[Xor[ErrorResponse, InstanceState[S]]] = {
def askAndConfirmAll(msg: Any, waitForRetries: Boolean = false)(implicit timeout: Timeout): Future[Either[ErrorResponse, InstanceState[S]]] = {

val futureMessages = askAndCollectAll(msg, waitForRetries).runWith(Sink.seq)

futureMessages.map {
_.lastOption match {
case Some(e: TransitionFired[_]) Xor.Right(e.result.asInstanceOf[InstanceState[S]])
case Some(msg) Xor.Left(UnexpectedMessage(s"Received unexpected message: $msg"))
case None Xor.Left(UnknownProcessId)
case Some(e: TransitionFired[_]) Right(e.result.asInstanceOf[InstanceState[S]])
case Some(msg) Left(UnexpectedMessage(s"Received unexpected message: $msg"))
case None Left(UnknownProcessId)
}
}
}
Expand All @@ -127,8 +126,8 @@ class PetriNetInstanceApi[S](topology: ExecutablePetriNet[S], actor: ActorRef)(i
*/
def askAndCollectAll(msg: Any, waitForRetries: Boolean = false): Source[TransitionResponse, NotUsed] = {
askSource[Any](actor, msg, takeWhileNotFailed(topology, waitForRetries)).map {
case e: TransitionResponse Xor.Right(e)
case msg @ _ Xor.Left(s"Received unexpected message: $msg")
}.takeWhile(_.isRight).map(_.asInstanceOf[Xor.Right[TransitionResponse]].b)
case e: TransitionResponse Right(e)
case msg @ _ Left(s"Received unexpected message: $msg")
}.takeWhile(_.isRight).map(_.asInstanceOf[Right[NotUsed, TransitionResponse]].value)
}
}
4 changes: 2 additions & 2 deletions akka/src/main/scala/io/kagera/execution/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package io.kagera
import java.io.{ PrintWriter, StringWriter }

import cats.data.State
import fs2.{ Strategy, Task }
import cats.effect.IO
import io.kagera.api._
import io.kagera.api.colored._
import io.kagera.execution.EventSourcing._

import scala.collection.Set
import scala.util.Random
import scala.concurrent.ExecutionContext

package object execution {

Expand Down
4 changes: 2 additions & 2 deletions akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.kagera.akka
import java.util.UUID

import akka.actor.{ ActorSystem, PoisonPill, Terminated }
import fs2.Strategy
import io.kagera.akka.PetriNetInstanceSpec._
import io.kagera.akka.actor.PetriNetInstance
import io.kagera.akka.actor.PetriNetInstance.Settings
Expand All @@ -13,6 +12,7 @@ import io.kagera.api.colored._
import io.kagera.api.colored.dsl._
import org.scalatest.time.{ Milliseconds, Span }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object PetriNetInstanceSpec {
Expand Down Expand Up @@ -243,7 +243,7 @@ class PetriNetInstanceSpec extends AkkaTestBase {
val ttl = 500 milliseconds

val customSettings = Settings(
evaluationStrategy = Strategy.fromCachedDaemonPool("Kagera.CachedThreadPool"),
evaluationStrategy = ExecutionContext.Implicits.global,
idleTTL = Some(ttl)
)

Expand Down
4 changes: 2 additions & 2 deletions akka/src/test/scala/io/kagera/akka/QuerySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.kagera.akka

import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.{ AllPersistenceIdsQuery, CurrentEventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.query.scaladsl.{CurrentEventsByPersistenceIdQuery, ReadJournal}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.{ ImplicitSender, TestKit }
Expand Down Expand Up @@ -32,7 +32,7 @@ class QuerySpec extends TestKit(ActorSystem("QuerySpec", AkkaTestBase.defaultTes

override def readJournal =
PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal with CurrentEventsByPersistenceIdQuery with AllPersistenceIdsQuery]
.asInstanceOf[ReadJournal with CurrentEventsByPersistenceIdQuery]

val p1 = Place[Unit](id = 1)
val p2 = Place[Unit](id = 2)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored

import fs2.Task
import cats.effect.IO
import io.kagera.api._

trait TransitionExecutor[State] {
Expand All @@ -27,16 +27,16 @@ class TransitionExecutorImpl[State](topology: ColoredPetriNet) extends Transitio
def fireTransition[Input, Output](t: Transition[Input, Output, State]): TransitionFunction[Input, Output, State] = {
(consume, state, input)

def handleFailure: PartialFunction[Throwable, Task[(Marking, Output)]] = {
case e: Throwable Task.fail(e)
def handleFailure: PartialFunction[Throwable, IO[(Marking, Output)]] = {
case e: Throwable IO.raiseError(e).asInstanceOf[IO[(Marking, Output)]]
}

if (consume.multiplicities != topology.inMarking(t)) {
Task.fail(new IllegalArgumentException(s"Transition $t may not consume $consume"))
IO.raiseError(new IllegalArgumentException(s"Transition $t may not consume $consume"))
}

try {
transitionFunction(t)(consume, state, input).handleWith { handleFailure }
transitionFunction(t)(consume, state, input).handleErrorWith { handleFailure }
} catch { handleFailure }
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.dsl

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored.ExceptionStrategy.BlockTransition
import io.kagera.api.colored._
import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition }
Expand All @@ -11,7 +11,7 @@ case class TransitionBehaviour[S, E](automated: Boolean, exceptionHandler: Trans
def asTransition(id: Long, eventSource: S E S) = new AbstractTransition[Unit, E, S](id, s"t$id", automated, Duration.Undefined, exceptionHandler) with UncoloredTransition[Unit, E, S] {
override val toString = label
override val updateState = eventSource
override def produceEvent(consume: Marking, state: S, input: Unit): Task[E] = Task.delay { (fn(state)) }
override def produceEvent(consume: Marking, state: S, input: Unit): IO[E] = IO.delay { (fn(state)) }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.dsl

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored.ExceptionStrategy.BlockTransition
import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition }
import io.kagera.api.colored.{ Marking, Transition, _ }
Expand All @@ -17,7 +17,7 @@ trait StateTransitionNet[S, E] {
new AbstractTransition[Unit, E, S](id, label.getOrElse(s"t$id"), automated, Duration.Undefined, exceptionStrategy) with UncoloredTransition[Unit, E, S] {
override val toString = label
override val updateState = eventSourcing
override def produceEvent(consume: Marking, state: S, input: Unit): Task[E] = Task.delay { (fn(state)) }
override def produceEvent(consume: Marking, state: S, input: Unit): IO[E] = IO.delay { (fn(state)) }
}

def createPetriNet(arcs: Arc*) = process[S](arcs: _*)
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/scala/io/kagera/api/colored/dsl/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored

import fs2.Task
import cats.effect.IO
import io.kagera.api._
import io.kagera.api.colored.transitions.{ AbstractTransition, IdentityTransition }
import io.kagera.api.multiset._
Expand Down Expand Up @@ -44,7 +44,7 @@ package object dsl {
override val toString = label

override def apply(inAdjacent: MultiSet[Place[_]], outAdjacent: MultiSet[Place[_]]) =
(marking, state, input) Task.delay {
(marking, state, input) IO.delay {
val produced = outAdjacent.map {
case (place, weight) place -> Map(constant -> weight)
}.toMarking
Expand Down
5 changes: 3 additions & 2 deletions api/src/main/scala/io/kagera/api/colored/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.kagera.api

import fs2.Task
import cats.effect.IO
import io.kagera.api.multiset._

import scala.language.existentials
import scala.language.higherKinds
import scalax.collection.Graph
import scalax.collection.edge.WLDiEdge

Expand Down Expand Up @@ -33,7 +34,7 @@ package object colored {
* @tparam Output The output emitted by the transition.
* @tparam State The state the transition closes over.
*/
type TransitionFunction[Input, Output, State] = (Marking, State, Input) Task[(Marking, Output)]
type TransitionFunction[Input, Output, State] = (Marking, State, Input) IO[(Marking, Output)]

/**
* An exception handler function associated with a transition.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.transitions

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored._
import io.kagera.api.multiset.{ MultiSet, _ }

Expand All @@ -17,5 +17,5 @@ trait UncoloredTransition[Input, Output, State] extends Transition[Input, Output
}
}

def produceEvent(consume: Marking, state: State, input: Input): Task[Output]
def produceEvent(consume: Marking, state: State, input: Input): IO[Output]
}
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val akka = Project("akka", file("akka"))
defaultProjectSettings ++ Seq(
name := "kagera-akka",
libraryDependencies ++= Seq(
scalaReflect,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
akkaActor,
akkaPersistence,
akkaSlf4j,
Expand All @@ -56,7 +56,10 @@ lazy val demo = (crossProject(JSPlatform, JVMPlatform) in file("demo"))
.settings(defaultProjectSettings: _*)
.settings(
unmanagedSourceDirectories in Compile += baseDirectory.value / "shared" / "main" / "scala",
libraryDependencies ++= Seq("com.lihaoyi" %%% "scalatags" % "0.4.6", "com.lihaoyi" %%% "upickle" % "0.4.2")
libraryDependencies ++= Seq(
"com.lihaoyi" %%% "scalatags" % "0.9.1",
"com.lihaoyi" %%% "upickle" % "1.1.0"
)
)
.jsSettings(
jsDependencies ++= Seq(
Expand All @@ -65,13 +68,13 @@ lazy val demo = (crossProject(JSPlatform, JVMPlatform) in file("demo"))
minified s"$cytoscapeVersion/dist/cytoscape.min.js"
commonJSName "cytoscape"
),
libraryDependencies ++= Seq("org.scala-js" %%% "scalajs-dom" % "0.8.0")
libraryDependencies ++= Seq("org.scala-js" %%% "scalajs-dom" % "1.0.0")
)
.jvmSettings(
libraryDependencies ++= Seq(
"de.heikoseeberger" %% "akka-http-upickle" % "1.10.1",
"de.heikoseeberger" %% "akka-http-upickle" % "1.32.0",
akkaHttp,
akkaPersistenceQuery,
akkaQuery,
akkaPersistenceCassandra
),
name := "demo-app",
Expand Down
1 change: 1 addition & 0 deletions demo/js/src/main/scala/io/kagera/frontend/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.kagera.frontend
import io.kagera.demo.model.{ PetriNetModel, ProcessState }
import org.scalajs.dom.ext.Ajax
import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue
import upickle.default._

import scala.concurrent.Future

Expand Down
4 changes: 2 additions & 2 deletions demo/js/src/main/scala/io/kagera/frontend/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import io.kagera.frontend.cytoscape._
import org.scalajs.dom.html

import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue
import scala.scalajs.js.annotation.JSExport
import scala.scalajs.js.annotation.{ JSExport, JSExportTopLevel }
import scalatags.JsDom.all._

@JSExport
@JSExportTopLevel("Client")
object Client extends {

@JSExport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.kagera.frontend.cytoscape
import org.scalajs.dom.html

import scala.scalajs.js
import scala.scalajs.js.annotation.JSName
import scala.scalajs.js.annotation.{ JSExportTopLevel, JSGlobal, JSName }

@JSName("cytoscape")
@JSGlobal("cytoscape")
@js.native
protected object CytoScapeJS extends js.Object {

Expand Down
6 changes: 3 additions & 3 deletions demo/jvm/src/main/scala/io/kagera/demo/Queries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import akka.actor.Actor
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.scaladsl.Source
import io.kagera.akka.persistence.TransitionFired
import io.kagera.api.colored._
import io.kagera.api.multiset._
import io.kagera.persistence.messages.TransitionFired

trait Queries {

Expand All @@ -16,7 +16,7 @@ trait Queries {
// obtain read journal
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

def allProcessIds: Source[String, NotUsed] = readJournal.allPersistenceIds()
def allProcessIds: Source[String, NotUsed] = readJournal.currentPersistenceIds()

def journalFor(id: String): Source[String, NotUsed] = {
readJournal.currentEventsByPersistenceId(s"process-$id", 0, Long.MaxValue).map {
Expand All @@ -32,7 +32,7 @@ class AggregateMarking[S](topology: ExecutablePetriNet[S]) extends Actor {

def updateMarking(aggregateMarking: MultiSet[Long]): Receive = {

case TransitionFired(_, Some(tid), Some(started), Some(completed), consumed, produced, data)
case TransitionFired(_, _, Some(tid), Some(started), Some(completed), consumed, produced, data)
val minusConsumed = consumed.foldLeft(aggregateMarking) {
case (aggregate, token) aggregate.multisetDecrement(token.placeId.get, token.count.get)
}
Expand Down
1 change: 1 addition & 0 deletions demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.kagera.akka.actor.PetriNetInstance
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.{ExecutablePetriNet, Generators, Marking}
import io.kagera.demo.{ConfiguredActorSystem, Queries}
import upickle.default._

trait Routes extends Directives with Queries with UpickleSupport {

Expand Down
Loading

0 comments on commit 5cbf303

Please sign in to comment.