Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Massive dependency updates #3

Merged
merged 1 commit into from
May 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.kagera.akka.actor
import akka.actor.{ ActorLogging, ActorRef, PoisonPill, Props }
import akka.pattern.pipe
import akka.persistence.PersistentActor
import fs2.Strategy
import io.kagera.akka.actor.PetriNetInstance.Settings
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.ExceptionStrategy.RetryWithDelay
Expand All @@ -12,17 +11,18 @@ import io.kagera.api.colored._
import io.kagera.execution.EventSourcing._
import io.kagera.execution._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.existentials

object PetriNetInstance {

case class Settings(
evaluationStrategy: Strategy,
evaluationStrategy: ExecutionContext,
idleTTL: Option[FiniteDuration])

val defaultSettings: Settings = Settings(
evaluationStrategy = Strategy.fromCachedDaemonPool("Kagera.CachedThreadPool"),
evaluationStrategy = ExecutionContext.Implicits.global,
idleTTL = Some(5 minutes)
)

Expand Down Expand Up @@ -152,7 +152,7 @@ class PetriNetInstance[S](
}

def executeJob[E](job: Job[S, E], originalSender: ActorRef) =
runJobAsync(job, executor)(settings.evaluationStrategy).unsafeRunAsyncFuture().pipeTo(context.self)(originalSender)
runJobAsync(job, executor)(settings.evaluationStrategy).unsafeToFuture().pipeTo(context.self)(originalSender)

override def onRecoveryCompleted(instance: Instance[S]) = step(instance)
}
23 changes: 11 additions & 12 deletions akka/src/main/scala/io/kagera/akka/actor/PetriNetInstanceApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.pattern.ask
import akka.stream.scaladsl.{ Sink, Source, SourceQueueWithComplete }
import akka.stream.{ Materializer, OverflowStrategy }
import akka.util.Timeout
import cats.data.Xor
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.ExceptionStrategy.RetryWithDelay
import io.kagera.api.colored.{ Transition, _ }
Expand Down Expand Up @@ -79,29 +78,29 @@ class PetriNetInstanceApi[S](topology: ExecutablePetriNet[S], actor: ActorRef)(i
/**
* Fires a transition and confirms (waits) for the result of that transition firing.
*/
def askAndConfirmFirst(msg: Any)(implicit timeout: Timeout): Future[Xor[UnexpectedMessage, InstanceState[S]]] = {
def askAndConfirmFirst(msg: Any)(implicit timeout: Timeout): Future[Either[UnexpectedMessage, InstanceState[S]]] = {
actor.ask(msg).map {
case e: TransitionFired[_] ⇒ Xor.Right(e.result.asInstanceOf[InstanceState[S]])
case msg @ _ ⇒ Xor.Left(UnexpectedMessage(s"Received unexepected message: $msg"))
case e: TransitionFired[_] ⇒ Right(e.result.asInstanceOf[InstanceState[S]])
case msg @ _ ⇒ Left(UnexpectedMessage(s"Received unexepected message: $msg"))
}
}

def askAndConfirmFirstSync(msg: Any)(implicit timeout: Timeout): Xor[UnexpectedMessage, InstanceState[S]] = {
def askAndConfirmFirstSync(msg: Any)(implicit timeout: Timeout): Either[UnexpectedMessage, InstanceState[S]] = {
Await.result(askAndConfirmFirst(topology, msg), timeout.duration)
}

/**
* Fires a transition and confirms (waits) for all responses of subsequent automated transitions.
*/
def askAndConfirmAll(msg: Any, waitForRetries: Boolean = false)(implicit timeout: Timeout): Future[Xor[ErrorResponse, InstanceState[S]]] = {
def askAndConfirmAll(msg: Any, waitForRetries: Boolean = false)(implicit timeout: Timeout): Future[Either[ErrorResponse, InstanceState[S]]] = {

val futureMessages = askAndCollectAll(msg, waitForRetries).runWith(Sink.seq)

futureMessages.map {
_.lastOption match {
case Some(e: TransitionFired[_]) ⇒ Xor.Right(e.result.asInstanceOf[InstanceState[S]])
case Some(msg) ⇒ Xor.Left(UnexpectedMessage(s"Received unexpected message: $msg"))
case None ⇒ Xor.Left(UnknownProcessId)
case Some(e: TransitionFired[_]) ⇒ Right(e.result.asInstanceOf[InstanceState[S]])
case Some(msg) ⇒ Left(UnexpectedMessage(s"Received unexpected message: $msg"))
case None ⇒ Left(UnknownProcessId)
}
}
}
Expand All @@ -127,8 +126,8 @@ class PetriNetInstanceApi[S](topology: ExecutablePetriNet[S], actor: ActorRef)(i
*/
def askAndCollectAll(msg: Any, waitForRetries: Boolean = false): Source[TransitionResponse, NotUsed] = {
askSource[Any](actor, msg, takeWhileNotFailed(topology, waitForRetries)).map {
case e: TransitionResponse ⇒ Xor.Right(e)
case msg @ _ ⇒ Xor.Left(s"Received unexpected message: $msg")
}.takeWhile(_.isRight).map(_.asInstanceOf[Xor.Right[TransitionResponse]].b)
case e: TransitionResponse ⇒ Right(e)
case msg @ _ ⇒ Left(s"Received unexpected message: $msg")
}.takeWhile(_.isRight).map(_.asInstanceOf[Right[NotUsed, TransitionResponse]].value)
}
}
4 changes: 2 additions & 2 deletions akka/src/main/scala/io/kagera/execution/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package io.kagera
import java.io.{ PrintWriter, StringWriter }

import cats.data.State
import fs2.{ Strategy, Task }
import cats.effect.IO
import io.kagera.api._
import io.kagera.api.colored._
import io.kagera.execution.EventSourcing._

import scala.collection.Set
import scala.util.Random
import scala.concurrent.ExecutionContext

package object execution {

Expand Down
4 changes: 2 additions & 2 deletions akka/src/test/scala/io/kagera/akka/PetriNetInstanceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.kagera.akka
import java.util.UUID

import akka.actor.{ ActorSystem, PoisonPill, Terminated }
import fs2.Strategy
import io.kagera.akka.PetriNetInstanceSpec._
import io.kagera.akka.actor.PetriNetInstance
import io.kagera.akka.actor.PetriNetInstance.Settings
Expand All @@ -13,6 +12,7 @@ import io.kagera.api.colored._
import io.kagera.api.colored.dsl._
import org.scalatest.time.{ Milliseconds, Span }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object PetriNetInstanceSpec {
Expand Down Expand Up @@ -243,7 +243,7 @@ class PetriNetInstanceSpec extends AkkaTestBase {
val ttl = 500 milliseconds

val customSettings = Settings(
evaluationStrategy = Strategy.fromCachedDaemonPool("Kagera.CachedThreadPool"),
evaluationStrategy = ExecutionContext.Implicits.global,
idleTTL = Some(ttl)
)

Expand Down
4 changes: 2 additions & 2 deletions akka/src/test/scala/io/kagera/akka/QuerySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.kagera.akka

import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.{ AllPersistenceIdsQuery, CurrentEventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.query.scaladsl.{CurrentEventsByPersistenceIdQuery, ReadJournal}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.{ ImplicitSender, TestKit }
Expand Down Expand Up @@ -32,7 +32,7 @@ class QuerySpec extends TestKit(ActorSystem("QuerySpec", AkkaTestBase.defaultTes

override def readJournal =
PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal with CurrentEventsByPersistenceIdQuery with AllPersistenceIdsQuery]
.asInstanceOf[ReadJournal with CurrentEventsByPersistenceIdQuery]

val p1 = Place[Unit](id = 1)
val p2 = Place[Unit](id = 2)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored

import fs2.Task
import cats.effect.IO
import io.kagera.api._

trait TransitionExecutor[State] {
Expand All @@ -27,16 +27,16 @@ class TransitionExecutorImpl[State](topology: ColoredPetriNet) extends Transitio
def fireTransition[Input, Output](t: Transition[Input, Output, State]): TransitionFunction[Input, Output, State] = {
(consume, state, input) ⇒

def handleFailure: PartialFunction[Throwable, Task[(Marking, Output)]] = {
case e: Throwable ⇒ Task.fail(e)
def handleFailure: PartialFunction[Throwable, IO[(Marking, Output)]] = {
case e: Throwable ⇒ IO.raiseError(e).asInstanceOf[IO[(Marking, Output)]]
}

if (consume.multiplicities != topology.inMarking(t)) {
Task.fail(new IllegalArgumentException(s"Transition $t may not consume $consume"))
IO.raiseError(new IllegalArgumentException(s"Transition $t may not consume $consume"))
}

try {
transitionFunction(t)(consume, state, input).handleWith { handleFailure }
transitionFunction(t)(consume, state, input).handleErrorWith { handleFailure }
} catch { handleFailure }
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.dsl

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored.ExceptionStrategy.BlockTransition
import io.kagera.api.colored._
import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition }
Expand All @@ -11,7 +11,7 @@ case class TransitionBehaviour[S, E](automated: Boolean, exceptionHandler: Trans
def asTransition(id: Long, eventSource: S ⇒ E ⇒ S) = new AbstractTransition[Unit, E, S](id, s"t$id", automated, Duration.Undefined, exceptionHandler) with UncoloredTransition[Unit, E, S] {
override val toString = label
override val updateState = eventSource
override def produceEvent(consume: Marking, state: S, input: Unit): Task[E] = Task.delay { (fn(state)) }
override def produceEvent(consume: Marking, state: S, input: Unit): IO[E] = IO.delay { (fn(state)) }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.dsl

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored.ExceptionStrategy.BlockTransition
import io.kagera.api.colored.transitions.{ AbstractTransition, UncoloredTransition }
import io.kagera.api.colored.{ Marking, Transition, _ }
Expand All @@ -17,7 +17,7 @@ trait StateTransitionNet[S, E] {
new AbstractTransition[Unit, E, S](id, label.getOrElse(s"t$id"), automated, Duration.Undefined, exceptionStrategy) with UncoloredTransition[Unit, E, S] {
override val toString = label
override val updateState = eventSourcing
override def produceEvent(consume: Marking, state: S, input: Unit): Task[E] = Task.delay { (fn(state)) }
override def produceEvent(consume: Marking, state: S, input: Unit): IO[E] = IO.delay { (fn(state)) }
}

def createPetriNet(arcs: Arc*) = process[S](arcs: _*)
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/scala/io/kagera/api/colored/dsl/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored

import fs2.Task
import cats.effect.IO
import io.kagera.api._
import io.kagera.api.colored.transitions.{ AbstractTransition, IdentityTransition }
import io.kagera.api.multiset._
Expand Down Expand Up @@ -44,7 +44,7 @@ package object dsl {
override val toString = label

override def apply(inAdjacent: MultiSet[Place[_]], outAdjacent: MultiSet[Place[_]]) =
(marking, state, input) ⇒ Task.delay {
(marking, state, input) ⇒ IO.delay {
val produced = outAdjacent.map {
case (place, weight) ⇒ place -> Map(constant -> weight)
}.toMarking
Expand Down
5 changes: 3 additions & 2 deletions api/src/main/scala/io/kagera/api/colored/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.kagera.api

import fs2.Task
import cats.effect.IO
import io.kagera.api.multiset._

import scala.language.existentials
import scala.language.higherKinds
import scalax.collection.Graph
import scalax.collection.edge.WLDiEdge

Expand Down Expand Up @@ -33,7 +34,7 @@ package object colored {
* @tparam Output The output emitted by the transition.
* @tparam State The state the transition closes over.
*/
type TransitionFunction[Input, Output, State] = (Marking, State, Input) ⇒ Task[(Marking, Output)]
type TransitionFunction[Input, Output, State] = (Marking, State, Input) ⇒ IO[(Marking, Output)]

/**
* An exception handler function associated with a transition.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.kagera.api.colored.transitions

import fs2.Task
import cats.effect.IO
import io.kagera.api.colored._
import io.kagera.api.multiset.{ MultiSet, _ }

Expand All @@ -17,5 +17,5 @@ trait UncoloredTransition[Input, Output, State] extends Transition[Input, Output
}
}

def produceEvent(consume: Marking, state: State, input: Input): Task[Output]
def produceEvent(consume: Marking, state: State, input: Input): IO[Output]
}
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val akka = Project("akka", file("akka"))
defaultProjectSettings ++ Seq(
name := "kagera-akka",
libraryDependencies ++= Seq(
scalaReflect,
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
akkaActor,
akkaPersistence,
akkaSlf4j,
Expand All @@ -56,7 +56,10 @@ lazy val demo = (crossProject(JSPlatform, JVMPlatform) in file("demo"))
.settings(defaultProjectSettings: _*)
.settings(
unmanagedSourceDirectories in Compile += baseDirectory.value / "shared" / "main" / "scala",
libraryDependencies ++= Seq("com.lihaoyi" %%% "scalatags" % "0.4.6", "com.lihaoyi" %%% "upickle" % "0.4.2")
libraryDependencies ++= Seq(
"com.lihaoyi" %%% "scalatags" % "0.9.1",
"com.lihaoyi" %%% "upickle" % "1.1.0"
)
)
.jsSettings(
jsDependencies ++= Seq(
Expand All @@ -65,13 +68,13 @@ lazy val demo = (crossProject(JSPlatform, JVMPlatform) in file("demo"))
minified s"$cytoscapeVersion/dist/cytoscape.min.js"
commonJSName "cytoscape"
),
libraryDependencies ++= Seq("org.scala-js" %%% "scalajs-dom" % "0.8.0")
libraryDependencies ++= Seq("org.scala-js" %%% "scalajs-dom" % "1.0.0")
)
.jvmSettings(
libraryDependencies ++= Seq(
"de.heikoseeberger" %% "akka-http-upickle" % "1.10.1",
"de.heikoseeberger" %% "akka-http-upickle" % "1.32.0",
akkaHttp,
akkaPersistenceQuery,
akkaQuery,
akkaPersistenceCassandra
),
name := "demo-app",
Expand Down
1 change: 1 addition & 0 deletions demo/js/src/main/scala/io/kagera/frontend/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.kagera.frontend
import io.kagera.demo.model.{ PetriNetModel, ProcessState }
import org.scalajs.dom.ext.Ajax
import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue
import upickle.default._

import scala.concurrent.Future

Expand Down
4 changes: 2 additions & 2 deletions demo/js/src/main/scala/io/kagera/frontend/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import io.kagera.frontend.cytoscape._
import org.scalajs.dom.html

import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue
import scala.scalajs.js.annotation.JSExport
import scala.scalajs.js.annotation.{ JSExport, JSExportTopLevel }
import scalatags.JsDom.all._

@JSExport
@JSExportTopLevel("Client")
object Client extends {

@JSExport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.kagera.frontend.cytoscape
import org.scalajs.dom.html

import scala.scalajs.js
import scala.scalajs.js.annotation.JSName
import scala.scalajs.js.annotation.{ JSExportTopLevel, JSGlobal, JSName }

@JSName("cytoscape")
@JSGlobal("cytoscape")
@js.native
protected object CytoScapeJS extends js.Object {

Expand Down
6 changes: 3 additions & 3 deletions demo/jvm/src/main/scala/io/kagera/demo/Queries.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import akka.actor.Actor
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.scaladsl.Source
import io.kagera.akka.persistence.TransitionFired
import io.kagera.api.colored._
import io.kagera.api.multiset._
import io.kagera.persistence.messages.TransitionFired

trait Queries {

Expand All @@ -16,7 +16,7 @@ trait Queries {
// obtain read journal
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

def allProcessIds: Source[String, NotUsed] = readJournal.allPersistenceIds()
def allProcessIds: Source[String, NotUsed] = readJournal.currentPersistenceIds()

def journalFor(id: String): Source[String, NotUsed] = {
readJournal.currentEventsByPersistenceId(s"process-$id", 0, Long.MaxValue).map {
Expand All @@ -32,7 +32,7 @@ class AggregateMarking[S](topology: ExecutablePetriNet[S]) extends Actor {

def updateMarking(aggregateMarking: MultiSet[Long]): Receive = {

case TransitionFired(_, Some(tid), Some(started), Some(completed), consumed, produced, data) ⇒
case TransitionFired(_, _, Some(tid), Some(started), Some(completed), consumed, produced, data) ⇒
val minusConsumed = consumed.foldLeft(aggregateMarking) {
case (aggregate, token) ⇒ aggregate.multisetDecrement(token.placeId.get, token.count.get)
}
Expand Down
1 change: 1 addition & 0 deletions demo/jvm/src/main/scala/io/kagera/demo/http/Routes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.kagera.akka.actor.PetriNetInstance
import io.kagera.akka.actor.PetriNetInstanceProtocol._
import io.kagera.api.colored.{ExecutablePetriNet, Generators, Marking}
import io.kagera.demo.{ConfiguredActorSystem, Queries}
import upickle.default._

trait Routes extends Directives with Queries with UpickleSupport {

Expand Down
Loading