From cb74a326f477aabf6c2468c55463c4583f36941e Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Sat, 18 Dec 2021 12:08:27 +0100 Subject: [PATCH 01/11] Upgrade to ZIO 2.0 (#152) * version updates * ZIO 2.0.0-RC1 migration * ZIO 2.0.0-RC1 migration - solved PR comments * ZIO 2.0.0-RC1 migration - solved PR comments * Update src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala Co-authored-by: Pierre Ricadat --- README.md | 28 ++++---- build.sbt | 27 +++---- project/plugins.sbt | 3 +- src/main/scala/zio/akka/cluster/Cluster.scala | 22 +++--- .../zio/akka/cluster/pubsub/PubSub.scala | 14 ++-- .../cluster/pubsub/impl/SubscriberImpl.scala | 4 +- .../zio/akka/cluster/sharding/Sharding.scala | 14 ++-- .../zio/akka/cluster/sharding/package.scala | 18 ++--- .../scala/zio/akka/cluster/ClusterSpec.scala | 11 +-- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 26 ++++--- .../akka/cluster/sharding/ShardingSpec.scala | 72 ++++++++++--------- 11 files changed, 124 insertions(+), 115 deletions(-) diff --git a/README.md b/README.md index ff7ab02..9293ddb 100644 --- a/README.md +++ b/README.md @@ -38,13 +38,13 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/cluster-usage.htm You can also manually join a cluster using `Cluster.join`. ```scala -def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] +def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] ``` It's possible to get the status of the cluster by calling `Cluster.clusterState` ```scala -val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] +val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] ``` To monitor the cluster and be informed of changes (e.g. new members, member unreachable, etc), use `Cluster.clusterEvents`. @@ -54,13 +54,13 @@ To unsubscribe, simply `shutdown` the queue. `initialStateAsEvents` indicates if you want to receive previous cluster events leading to the current state, or only future events. ```scala -def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] +def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] ``` Finally, you can leave the current cluster using `Cluster.leave`. ```scala -val leave: ZIO[Has[ActorSystem], Throwable, Unit] +val leave: ZIO[ActorSystem, Throwable, Unit] ``` ### Akka PubSub @@ -75,7 +75,7 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/distributed-pub-s To create a `PubSub` object which can both publish and subscribe, use `PubSub.createPubSub`. ```scala -def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] +def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] ``` There are also less powerful variants `PubSub.createPublisher` if you only need to publish and `PubSub.createSubscriber` if you only need to subscribe. @@ -111,11 +111,11 @@ This library wraps messages inside of a `zio.akka.cluster.pubsub.MessageEnvelope ```scala import akka.actor.ActorSystem -import zio.{ Has, Managed, Task, ZLayer } +import zio.{ Managed, Task, ZLayer } import zio.akka.cluster.pubsub.PubSub -val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = - ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) +val actorSystem: ZLayer[Any, Throwable, ActorSystem] = + ZLayer.fromManaged(Managed.acquireReleaseWith(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) (for { pubSub <- PubSub.createPubSub[String] @@ -143,7 +143,7 @@ def start[R, Msg, State]( name: String, onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit], numberOfShards: Int = 100 - ): ZIO[Has[ActorSystem] with R, Throwable, Sharding[Msg]] + ): ZIO[ActorSystem with R, Throwable, Sharding[Msg]] ``` It requires: @@ -176,14 +176,14 @@ This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelo ```scala import akka.actor.ActorSystem import zio.akka.cluster.sharding.{ Entity, Sharding } -import zio.{ Has, Managed, Task, ZIO, ZLayer } +import zio.{ Managed, Task, ZIO, ZLayer } -val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = - ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) +val actorSystem: ZLayer[Any, Throwable, ActorSystem] = + ZLayer.fromManaged(Managed.acquireReleaseWith(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) val behavior: String => ZIO[Entity[Int], Nothing, Unit] = { - case "+" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) + 1))) - case "-" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) - 1))) + case "+" => ZIO.serviceWithZIO[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1))) + case "-" => ZIO.serviceWithZIO[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1))) case _ => ZIO.unit } diff --git a/build.sbt b/build.sbt index ae3848b..cb74903 100644 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,16 @@ -val mainScala = "2.13.1" -val allScala = Seq("2.11.12", "2.12.10", mainScala) +val mainScala = "2.13.7" +val allScala = Seq("2.11.12", "2.12.15", mainScala) + +val zioVersion = "2.0.0-RC1" +val akkaVersion = "2.5.32" organization := "dev.zio" homepage := Some(url("https://github.com/zio/zio-akka-cluster")) name := "zio-akka-cluster" licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")) scalaVersion := mainScala -parallelExecution in Test := false -fork in Test := true +Test / parallelExecution := false +Test / fork := true pgpPublicRing := file("/tmp/public.asc") pgpSecretRing := file("/tmp/secret.asc") scmInfo := Some( @@ -23,13 +26,13 @@ developers := List( ) libraryDependencies ++= Seq( - "dev.zio" %% "zio" % "1.0.13", - "dev.zio" %% "zio-streams" % "1.0.13", - "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.32", - "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.32", - "dev.zio" %% "zio-test" % "1.0.13" % "test", - "dev.zio" %% "zio-test-sbt" % "1.0.13" % "test", - compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.3"), + "dev.zio" %% "zio" % zioVersion, + "dev.zio" %% "zio-streams" % zioVersion, + "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion, + "dev.zio" %% "zio-test" % zioVersion % "test", + "dev.zio" %% "zio-test-sbt" % zioVersion % "test", + compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2" cross CrossVersion.full), compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") ) @@ -78,7 +81,7 @@ scalacOptions ++= Seq( case _ => Nil }) -fork in run := true +run / fork := true crossScalaVersions := allScala diff --git a/project/plugins.sbt b/project/plugins.sbt index ea48e2a..53e1a14 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.5") addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.33") diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index e347cd6..cb459a7 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -3,20 +3,20 @@ package zio.akka.cluster import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props } import akka.cluster.ClusterEvent._ import zio.Exit.{ Failure, Success } -import zio.{ Has, Queue, Runtime, Task, ZIO } +import zio.{ Queue, Runtime, Task, ZIO } object Cluster { - private val cluster: ZIO[Has[ActorSystem], Throwable, akka.cluster.Cluster] = + private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] = for { - actorSystem <- ZIO.access[Has[ActorSystem]](_.get) + actorSystem <- ZIO.service[ActorSystem] cluster <- Task(akka.cluster.Cluster(actorSystem)) } yield cluster /** * Returns the current state of the cluster. */ - val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] = + val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] = for { cluster <- cluster state <- Task(cluster.state) @@ -25,7 +25,7 @@ object Cluster { /** * Joins a cluster using the provided seed nodes. */ - def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] = + def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster _ <- Task(cluster.joinSeedNodes(seedNodes)) @@ -34,7 +34,7 @@ object Cluster { /** * Leaves the current cluster. */ - val leave: ZIO[Has[ActorSystem], Throwable, Unit] = + val leave: ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster _ <- Task(cluster.leave(cluster.selfAddress)) @@ -48,7 +48,7 @@ object Cluster { */ def clusterEvents( initialStateAsEvents: Boolean = false - ): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] = + ): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] = Queue.unbounded[ClusterDomainEvent].tap(clusterEventsWith(_, initialStateAsEvents)) /** @@ -59,10 +59,10 @@ object Cluster { def clusterEventsWith( queue: Queue[ClusterDomainEvent], initialStateAsEvents: Boolean = false - ): ZIO[Has[ActorSystem], Throwable, Unit] = + ): ZIO[ActorSystem, Throwable, Unit] = for { rts <- Task.runtime - actorSystem <- ZIO.access[Has[ActorSystem]](_.get) + actorSystem <- ZIO.service[ActorSystem] _ <- Task(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) } yield () @@ -78,9 +78,9 @@ object Cluster { def receive: PartialFunction[Any, Unit] = { case ev: ClusterDomainEvent => - rts.unsafeRunAsync(queue.offer(ev)) { + rts.unsafeRunAsyncWith(queue.offer(ev)) { case Success(_) => () - case Failure(cause) => if (cause.interrupted) self ! PoisonPill // stop listening if the queue was shut down + case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down } case _ => } diff --git a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala index 2aacde1..b34f2a8 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala @@ -3,7 +3,7 @@ package zio.akka.cluster.pubsub import akka.actor.{ ActorRef, ActorSystem } import akka.cluster.pubsub.DistributedPubSub import zio.akka.cluster.pubsub.impl.{ PublisherImpl, SubscriberImpl } -import zio.{ Has, Queue, Task, ZIO } +import zio.{ Queue, Task, ZIO } /** * A `Publisher[A]` is able to send messages of type `A` through Akka PubSub. @@ -35,9 +35,9 @@ object PubSub { /** * Creates a new `Publisher[A]`. */ - def createPublisher[A]: ZIO[Has[ActorSystem], Throwable, Publisher[A]] = + def createPublisher[A]: ZIO[ActorSystem, Throwable, Publisher[A]] = for { - actorSystem <- ZIO.access[Has[ActorSystem]](_.get) + actorSystem <- ZIO.service[ActorSystem] mediator <- getMediator(actorSystem) } yield new Publisher[A] with PublisherImpl[A] { override val getMediator: ActorRef = mediator @@ -46,9 +46,9 @@ object PubSub { /** * Creates a new `Subscriber[A]`. */ - def createSubscriber[A]: ZIO[Has[ActorSystem], Throwable, Subscriber[A]] = + def createSubscriber[A]: ZIO[ActorSystem, Throwable, Subscriber[A]] = for { - actorSystem <- ZIO.access[Has[ActorSystem]](_.get) + actorSystem <- ZIO.service[ActorSystem] mediator <- getMediator(actorSystem) } yield new Subscriber[A] with SubscriberImpl[A] { override val getActorSystem: ActorSystem = actorSystem @@ -58,9 +58,9 @@ object PubSub { /** * Creates a new `PubSub[A]`. */ - def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] = + def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] = for { - actorSystem <- ZIO.access[Has[ActorSystem]](_.get) + actorSystem <- ZIO.service[ActorSystem] mediator <- getMediator(actorSystem) } yield new PubSub[A] with PublisherImpl[A] with SubscriberImpl[A] { override val getActorSystem: ActorSystem = actorSystem diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index 561b591..643ddfe 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -39,9 +39,9 @@ object SubscriberImpl { rts.unsafeRunSync(subscribed.succeed(())) () case MessageEnvelope(msg) => - rts.unsafeRunAsync(queue.offer(msg.asInstanceOf[A])) { + rts.unsafeRunAsyncWith(queue.offer(msg.asInstanceOf[A])) { case Success(_) => () - case Failure(cause) => if (cause.interrupted) self ! PoisonPill // stop listening if the queue was shut down + case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down } } } diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index 7558aa6..6f35169 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -9,7 +9,7 @@ import akka.pattern.{ ask => askPattern } import akka.util.Timeout import zio.akka.cluster.sharding import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PassivatePayload, PoisonPillPayload } -import zio.{ =!=, Has, Ref, Runtime, Tag, Task, UIO, ZIO, ZLayer } +import zio.{ =!=, Ref, Runtime, Tag, Task, UIO, ZIO, ZLayer } /** * A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one. @@ -37,14 +37,14 @@ object Sharding { * @param askTimeout a finite duration specifying how long an ask is allowed to wait for an entity to respond * @return a [[Sharding]] object that can be used to send messages to sharded entities */ - def start[R <: Has[_], Msg, State: Tag]( + def start[R, Msg, State: Tag]( name: String, onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit], numberOfShards: Int = 100, askTimeout: FiniteDuration = 10.seconds - ): ZIO[Has[ActorSystem] with R, Throwable, Sharding[Msg]] = + ): ZIO[ActorSystem with R, Throwable, Sharding[Msg]] = for { - rts <- ZIO.runtime[Has[ActorSystem] with R] + rts <- ZIO.runtime[ActorSystem with R] actorSystem = rts.environment.get[ActorSystem] shardingRegion <- Task( ClusterSharding(actorSystem).start( @@ -83,9 +83,9 @@ object Sharding { role: Option[String], numberOfShards: Int = 100, askTimeout: FiniteDuration = 10.seconds - ): ZIO[Has[ActorSystem], Throwable, Sharding[Msg]] = + ): ZIO[ActorSystem, Throwable, Sharding[Msg]] = for { - rts <- ZIO.runtime[Has[ActorSystem]] + rts <- ZIO.runtime[ActorSystem] actorSystem = rts.environment.get shardingRegion <- Task( ClusterSharding(actorSystem).startProxy( @@ -129,7 +129,7 @@ object Sharding { ) } - private[sharding] class ShardEntity[R <: Has[_], Msg, State: Tag](rts: Runtime[R])( + private[sharding] class ShardEntity[R, Msg, State: Tag](rts: Runtime[R])( onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit] ) extends Actor { diff --git a/src/main/scala/zio/akka/cluster/sharding/package.scala b/src/main/scala/zio/akka/cluster/sharding/package.scala index 21786ca..18677c3 100644 --- a/src/main/scala/zio/akka/cluster/sharding/package.scala +++ b/src/main/scala/zio/akka/cluster/sharding/package.scala @@ -1,12 +1,12 @@ package zio.akka.cluster import akka.actor.ActorContext -import zio.{ Has, Ref, Tag, Task, UIO, URIO, ZIO } +import zio.{ Ref, Tag, Task, UIO, URIO, ZIO } import scala.concurrent.duration.Duration package object sharding { - type Entity[State] = Has[Entity.Service[State]] + type Entity[State] = Entity.Service[State] object Entity { @@ -21,19 +21,19 @@ package object sharding { } def replyToSender[State: Tag, R](msg: R): ZIO[Entity[State], Throwable, Unit] = - ZIO.accessM[Entity[State]](_.get.replyToSender(msg)) + ZIO.serviceWithZIO[Entity[State]](_.replyToSender(msg)) def context[State: Tag]: URIO[Entity[State], ActorContext] = - ZIO.access[Entity[State]](_.get.context) + ZIO.service[Entity[State]].map(_.context) def id[State: Tag]: URIO[Entity[State], String] = - ZIO.access[Entity[State]](_.get.id) + ZIO.service[Entity[State]].map(_.id) def state[State: Tag]: URIO[Entity[State], Ref[Option[State]]] = - ZIO.access[Entity[State]](_.get.state) + ZIO.service[Entity[State]].map(_.state) def stop[State: Tag]: ZIO[Entity[State], Nothing, Unit] = - ZIO.accessM[Entity[State]](_.get.stop) + ZIO.serviceWithZIO[Entity[State]](_.stop) def passivate[State: Tag]: ZIO[Entity[State], Nothing, Unit] = - ZIO.accessM[Entity[State]](_.get.passivate) + ZIO.serviceWithZIO[Entity[State]](_.passivate) def passivateAfter[State: Tag](duration: Duration): ZIO[Entity[State], Nothing, Unit] = - ZIO.accessM[Entity[State]](_.get.passivateAfter(duration)) + ZIO.serviceWithZIO[Entity[State]](_.passivateAfter(duration)) } } diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index 93976be..f561e17 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -5,14 +5,15 @@ import akka.cluster.ClusterEvent.MemberLeft import com.typesafe.config.{ Config, ConfigFactory } import zio.test.Assertion._ import zio.test._ -import zio.test.environment.TestEnvironment +import zio.test.TestEnvironment +import zio.test.ZIOSpecDefault import zio.{ Managed, Task, ZLayer } -object ClusterSpec extends DefaultRunnableSpec { +object ClusterSpec extends ZIOSpecDefault { def spec: ZSpec[TestEnvironment, Any] = suite("ClusterSpec")( - testM("receive cluster events") { + test("receive cluster events") { val config: Config = ConfigFactory.parseString(s""" |akka { | actor { @@ -31,7 +32,9 @@ object ClusterSpec extends DefaultRunnableSpec { """.stripMargin) val actorSystem: Managed[Throwable, ActorSystem] = - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) assertM( for { diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index 731e982..b4a3598 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -4,10 +4,11 @@ import akka.actor.ActorSystem import com.typesafe.config.{ Config, ConfigFactory } import zio.test.Assertion._ import zio.test._ -import zio.test.environment.TestEnvironment -import zio.{ ExecutionStrategy, Has, Managed, Task, ZLayer } +import zio.test.TestEnvironment +import zio.test.ZIOSpecDefault +import zio.{ ExecutionStrategy, Managed, Task, ZLayer } -object PubSubSpec extends DefaultRunnableSpec { +object PubSubSpec extends ZIOSpecDefault { val config: Config = ConfigFactory.parseString(s""" |akka { @@ -26,9 +27,9 @@ object PubSubSpec extends DefaultRunnableSpec { |} """.stripMargin) - val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer.fromManaged( - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) ) val topic = "topic" @@ -36,7 +37,7 @@ object PubSubSpec extends DefaultRunnableSpec { def spec: ZSpec[TestEnvironment, Any] = suite("PubSubSpec")( - testM("send and receive a single message") { + test("send and receive a single message") { assertM( for { pubSub <- PubSub.createPubSub[String] @@ -46,7 +47,7 @@ object PubSubSpec extends DefaultRunnableSpec { } yield item )(equalTo(msg)).provideLayer(actorSystem) }, - testM("support multiple subscribers") { + test("support multiple subscribers") { assertM( for { pubSub <- PubSub.createPubSub[String] @@ -58,7 +59,7 @@ object PubSubSpec extends DefaultRunnableSpec { } yield (item1, item2) )(equalTo((msg, msg))).provideLayer(actorSystem) }, - testM("support multiple publishers") { + test("support multiple publishers") { val msg2 = "what's up" assertM( for { @@ -71,7 +72,7 @@ object PubSubSpec extends DefaultRunnableSpec { } yield (item1, item2) )(equalTo((msg, msg2))).provideLayer(actorSystem) }, - testM("send only one message to a single group") { + test("send only one message to a single group") { val group = "group" assertM( for { @@ -84,7 +85,7 @@ object PubSubSpec extends DefaultRunnableSpec { } yield (item, sizes) )(equalTo((msg, (0, 0)))).provideLayer(actorSystem) }, - testM("send one message to each group") { + test("send one message to each group") { val group1 = "group1" val group2 = "group2" assertM( @@ -98,8 +99,5 @@ object PubSubSpec extends DefaultRunnableSpec { } yield List(item1, item2) )(equalTo(List(msg, msg))).provideLayer(actorSystem) } - ) - - override def aspects: List[TestAspect[Nothing, TestEnvironment, Nothing, Any]] = - List(TestAspect.executionStrategy(ExecutionStrategy.Sequential)) + ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) } diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index 253243a..c73ac73 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -3,14 +3,14 @@ package zio.akka.cluster.sharding import scala.language.postfixOps import akka.actor.ActorSystem import com.typesafe.config.{ Config, ConfigFactory } -import zio.clock.Clock -import zio.duration._ import zio.test.Assertion._ import zio.test._ -import zio.test.environment.TestEnvironment -import zio.{ ExecutionStrategy, Has, Managed, Promise, Task, UIO, ZIO, ZLayer } +import zio.test.TestEnvironment +import zio.test.ZIOSpecDefault +import zio.{ ExecutionStrategy, Managed, Promise, Task, UIO, ZIO, ZLayer } +import zio._ -object ShardingSpec extends DefaultRunnableSpec { +object ShardingSpec extends ZIOSpecDefault { val config: Config = ConfigFactory.parseString(s""" |akka { @@ -30,9 +30,9 @@ object ShardingSpec extends DefaultRunnableSpec { |} """.stripMargin) - val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] = + val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer.fromManaged( - Managed.make(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) ) val config2: Config = ConfigFactory.parseString(s""" @@ -53,9 +53,11 @@ object ShardingSpec extends DefaultRunnableSpec { |} """.stripMargin) - val actorSystem2: ZLayer[Any, Throwable, Has[ActorSystem]] = + val actorSystem2: ZLayer[Any, Throwable, ActorSystem] = ZLayer.fromManaged( - Managed.make(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) + Managed.acquireReleaseWith(Task(ActorSystem("Test", config2)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val shardId = "shard" @@ -64,7 +66,7 @@ object ShardingSpec extends DefaultRunnableSpec { def spec: ZSpec[TestEnvironment, Any] = suite("ShardingSpec")( - testM("send and receive a single message") { + test("send and receive a single message") { assertM( for { p <- Promise.make[Nothing, String] @@ -75,9 +77,9 @@ object ShardingSpec extends DefaultRunnableSpec { } yield res )(equalTo(msg)).provideLayer(actorSystem) }, - testM("send and receive a message using ask") { + test("send and receive a message using ask") { val onMessage: String => ZIO[Entity[Any], Nothing, Unit] = - incomingMsg => ZIO.accessM[Entity[Any]](r => r.get.replyToSender(incomingMsg).orDie) + incomingMsg => ZIO.serviceWithZIO[Entity[Any]](_.replyToSender(incomingMsg).orDie) assertM( for { sharding <- Sharding.start(shardName, onMessage) @@ -85,13 +87,13 @@ object ShardingSpec extends DefaultRunnableSpec { } yield reply )(equalTo(msg)).provideLayer(actorSystem) }, - testM("gather state") { + test("gather state") { assertM( for { p <- Promise.make[Nothing, Boolean] onMessage = (_: String) => for { - state <- ZIO.access[Entity[Int]](_.get.state) + state <- ZIO.serviceWith[Entity[Int]](_.state) newState <- state.updateAndGet { case None => Some(1) case Some(x) => Some(x + 1) @@ -109,15 +111,16 @@ object ShardingSpec extends DefaultRunnableSpec { } yield (earlyPoll, res) )(equalTo((None, true))).provideLayer(actorSystem) }, - testM("kill itself") { + test("kill itself") { assertM( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => msg match { - case "set" => ZIO.accessM[Entity[Unit]](_.get.state.set(Some(()))) - case "get" => ZIO.accessM[Entity[Unit]](_.get.state.get.flatMap(s => p.succeed(s).unit)) - case "die" => ZIO.accessM[Entity[Unit]](_.get.stop) + case "set" => ZIO.serviceWithZIO[Entity[Unit]](_.state.set(Some(()))) + case "get" => + ZIO.serviceWithZIO[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit)) + case "die" => ZIO.serviceWithZIO[Entity[Unit]](_.stop) } sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") @@ -131,14 +134,15 @@ object ShardingSpec extends DefaultRunnableSpec { } yield res )(isNone).provideLayer(actorSystem) }, - testM("passivate") { + test("passivate") { assertM( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => msg match { - case "set" => ZIO.accessM[Entity[Unit]](_.get.state.set(Some(()))) - case "get" => ZIO.accessM[Entity[Unit]](_.get.state.get.flatMap(s => p.succeed(s).unit)) + case "set" => ZIO.serviceWithZIO[Entity[Unit]](_.state.set(Some(()))) + case "get" => + ZIO.serviceWithZIO[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit)) } sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") @@ -152,15 +156,17 @@ object ShardingSpec extends DefaultRunnableSpec { } yield res )(isNone).provideLayer(actorSystem) }, - testM("passivateAfter") { + test("passivateAfter") { assertM( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => msg match { - case "set" => ZIO.accessM[Entity[Unit]](_.get.state.set(Some(()))) - case "get" => ZIO.accessM[Entity[Unit]](_.get.state.get.flatMap(s => p.succeed(s).unit)) - case "timeout" => ZIO.accessM[Entity[Unit]](_.get.passivateAfter((1 millisecond).asScala)) + case "set" => ZIO.serviceWithZIO[Entity[Unit]](_.state.set(Some(()))) + case "get" => + ZIO.serviceWithZIO[Entity[Unit]](_.state.get.flatMap(s => p.succeed(s).unit)) + case "timeout" => + ZIO.serviceWithZIO[Entity[Unit]](_.passivateAfter((1 millisecond).asScala)) } sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") @@ -174,7 +180,7 @@ object ShardingSpec extends DefaultRunnableSpec { } yield res )(isNone).provideLayer(actorSystem) }, - testM("work with 2 actor systems") { + test("work with 2 actor systems") { assertM( actorSystem.build.use(a1 => actorSystem2.build.use(a2 => @@ -183,8 +189,8 @@ object ShardingSpec extends DefaultRunnableSpec { p2 <- Promise.make[Nothing, Unit] onMessage1 = (_: String) => p1.succeed(()).unit onMessage2 = (_: String) => p2.succeed(()).unit - sharding1 <- Sharding.start(shardName, onMessage1).provideLayer(ZLayer.succeedMany(a1)) - _ <- Sharding.start(shardName, onMessage2).provideLayer(ZLayer.succeedMany(a2)) + sharding1 <- Sharding.start(shardName, onMessage1).provideEnvironment(a1) + _ <- Sharding.start(shardName, onMessage2).provideEnvironment(a2) _ <- sharding1.send("1", "hi") _ <- sharding1.send("2", "hi") _ <- p1.await @@ -194,12 +200,12 @@ object ShardingSpec extends DefaultRunnableSpec { ) )(isUnit) }, - testM("provide proper environment to onMessage") { + test("provide proper environment to onMessage") { trait TestService { def doSomething(): UIO[String] } def doSomething = - ZIO.accessM[Has[TestService]](_.get.doSomething()) + ZIO.serviceWithZIO[TestService](_.doSomething()) val l = ZLayer.succeed(new TestService { override def doSomething(): UIO[String] = UIO("test") @@ -208,15 +214,13 @@ object ShardingSpec extends DefaultRunnableSpec { assertM( for { p <- Promise.make[Nothing, String] - onMessage = (_: String) => (doSomething >>= p.succeed).unit + onMessage = (_: String) => (doSomething flatMap p.succeed).unit sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, msg) res <- p.await } yield res )(equalTo("test")).provideLayer(actorSystem ++ l) } - ) + ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout(30.seconds) - override def aspects: List[TestAspect[Nothing, TestEnvironment, Nothing, Any]] = - List(TestAspect.executionStrategy(ExecutionStrategy.Sequential), TestAspect.timeout(30.seconds)) } From 5cb8e57d83f58d2cc562743dba1ab3290791f072 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Sat, 18 Dec 2021 12:24:58 +0100 Subject: [PATCH 02/11] Publish for 2.x branch --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 48e3336..00e4570 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: pull_request: push: - branches: ['master'] + branches: ['master', 'series/2.x'] release: types: - published From 694cfc46184552ac123a0f715a75c1777457bc6a Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Wed, 5 Jan 2022 11:16:54 +0100 Subject: [PATCH 03/11] ZIO2 - Akka 2.6 (#155) * akka 2.6.18 * akka 2.6.18 * akka 2.6.18 * akka 2.6.18 * akka 2.6.18 --- build.sbt | 14 ++------------ src/main/scala/zio/akka/cluster/Cluster.scala | 5 +++-- .../cluster/pubsub/impl/SubscriberImpl.scala | 3 ++- .../scala/zio/akka/cluster/ClusterSpec.scala | 19 ++++++++++++++----- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 6 ++++-- .../akka/cluster/sharding/ShardingSpec.scala | 16 ++++++++++------ 6 files changed, 35 insertions(+), 28 deletions(-) diff --git a/build.sbt b/build.sbt index cb74903..155bb6f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ val mainScala = "2.13.7" -val allScala = Seq("2.11.12", "2.12.15", mainScala) +val allScala = Seq("2.12.15", mainScala) val zioVersion = "2.0.0-RC1" -val akkaVersion = "2.5.32" +val akkaVersion = "2.6.18" organization := "dev.zio" homepage := Some(url("https://github.com/zio/zio-akka-cluster")) @@ -53,16 +53,6 @@ scalacOptions ++= Seq( "-Ywarn-unused", "-Ywarn-value-discard" ) ++ (CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, 11)) => - Seq( - "-Yno-adapted-args", - "-Ypartial-unification", - "-Ywarn-inaccessible", - "-Ywarn-infer-any", - "-Ywarn-nullary-override", - "-Ywarn-nullary-unit", - "-Xfuture" - ) case Some((2, 12)) => Seq( "-Xsource:2.13", diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index cb459a7..38ba7c2 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -76,13 +76,14 @@ object Cluster { if (initialStateAsEvents) InitialStateAsEvents else InitialStateAsSnapshot akka.cluster.Cluster(context.system).subscribe(self, initialState, classOf[ClusterDomainEvent]) - def receive: PartialFunction[Any, Unit] = { + def receive: Actor.Receive = { case ev: ClusterDomainEvent => rts.unsafeRunAsyncWith(queue.offer(ev)) { case Success(_) => () case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down } - case _ => + () + case _ => () } } diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index 643ddfe..bfbb178 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -34,7 +34,7 @@ object SubscriberImpl { mediator ! Subscribe(topic, group, self) - def receive: PartialFunction[Any, Unit] = { + def receive: Actor.Receive = { case SubscribeAck(_) => rts.unsafeRunSync(subscribed.succeed(())) () @@ -43,6 +43,7 @@ object SubscriberImpl { case Success(_) => () case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down } + () } } } diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index f561e17..fb6ca86 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -3,6 +3,7 @@ package zio.akka.cluster import akka.actor.ActorSystem import akka.cluster.ClusterEvent.MemberLeft import com.typesafe.config.{ Config, ConfigFactory } +import zio.stream.ZStream import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment @@ -20,13 +21,15 @@ object ClusterSpec extends ZIOSpecDefault { | provider = "cluster" | } | remote { - | netty.tcp { + | enabled-transports = ["akka.remote.artery.canonical"] + | artery.canonical { | hostname = "127.0.0.1" | port = 2551 | } | } | cluster { - | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) @@ -40,9 +43,15 @@ object ClusterSpec extends ZIOSpecDefault { for { queue <- Cluster.clusterEvents() _ <- Cluster.leave - item <- queue.take - } yield item - )(isSubtype[MemberLeft](anything)).provideLayer(ZLayer.fromManaged(actorSystem)) + items <- ZStream + .fromQueue(queue) + .takeUntil { + case _: MemberLeft => true + case _ => false + } + .runCollect + } yield items + )(isNonEmpty).provideLayer(ZLayer.fromManaged(actorSystem)) } ) } diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index b4a3598..305c439 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -16,13 +16,15 @@ object PubSubSpec extends ZIOSpecDefault { | provider = "cluster" | } | remote { - | netty.tcp { + | enabled-transports = ["akka.remote.artery.canonical"] + | artery.canonical { | hostname = "127.0.0.1" | port = 2551 | } | } | cluster { - | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index c73ac73..388eefa 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -18,14 +18,16 @@ object ShardingSpec extends ZIOSpecDefault { | provider = "cluster" | } | remote { - | netty.tcp { + | enabled-transports = ["akka.remote.artery.canonical"] + | artery.canonical { | hostname = "127.0.0.1" | port = 2551 | } | } | cluster { - | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2551"] | jmx.multi-mbeans-in-same-jvm = on + | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) @@ -41,14 +43,16 @@ object ShardingSpec extends ZIOSpecDefault { | provider = "cluster" | } | remote { - | netty.tcp { + | enabled-transports = ["akka.remote.artery.canonical"] + | artery.canonical { | hostname = "127.0.0.1" | port = 2552 | } | } | cluster { - | seed-nodes = ["akka.tcp://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2552"] | jmx.multi-mbeans-in-same-jvm = on + | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) @@ -190,9 +194,9 @@ object ShardingSpec extends ZIOSpecDefault { onMessage1 = (_: String) => p1.succeed(()).unit onMessage2 = (_: String) => p2.succeed(()).unit sharding1 <- Sharding.start(shardName, onMessage1).provideEnvironment(a1) - _ <- Sharding.start(shardName, onMessage2).provideEnvironment(a2) + sharding2 <- Sharding.start(shardName, onMessage2).provideEnvironment(a2) _ <- sharding1.send("1", "hi") - _ <- sharding1.send("2", "hi") + _ <- sharding2.send("2", "hi") _ <- p1.await _ <- p2.await } yield () From de07d967649ebf5928116a796af8f7bb4f68be3b Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Thu, 6 Jan 2022 09:06:55 +0900 Subject: [PATCH 04/11] Fix test (#156) --- src/test/scala/zio/akka/cluster/ClusterSpec.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index fb6ca86..a76fdd4 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -1,5 +1,7 @@ package zio.akka.cluster +import scala.language.postfixOps + import akka.actor.ActorSystem import akka.cluster.ClusterEvent.MemberLeft import com.typesafe.config.{ Config, ConfigFactory } @@ -8,7 +10,7 @@ import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment import zio.test.ZIOSpecDefault -import zio.{ Managed, Task, ZLayer } +import zio._ object ClusterSpec extends ZIOSpecDefault { @@ -42,6 +44,7 @@ object ClusterSpec extends ZIOSpecDefault { assertM( for { queue <- Cluster.clusterEvents() + _ <- Clock.sleep(5 seconds) _ <- Cluster.leave items <- ZStream .fromQueue(queue) @@ -50,8 +53,9 @@ object ClusterSpec extends ZIOSpecDefault { case _ => false } .runCollect + .timeoutFail(new Exception("Timeout"))(10 seconds) } yield items - )(isNonEmpty).provideLayer(ZLayer.fromManaged(actorSystem)) + )(isNonEmpty).provideLayer(ZLayer.fromManaged(actorSystem) ++ Clock.live) } ) } From 50773d016bb562e8a79685e805cc5b72bd854b13 Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Thu, 24 Mar 2022 15:06:46 +0100 Subject: [PATCH 05/11] zio2 rc3 (#159) --- build.sbt | 4 +- src/main/scala/zio/akka/cluster/Cluster.scala | 4 +- .../cluster/pubsub/impl/SubscriberImpl.scala | 2 +- .../zio/akka/cluster/sharding/Sharding.scala | 8 +-- .../scala/zio/akka/cluster/ClusterSpec.scala | 8 ++- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 9 ++-- .../akka/cluster/sharding/ShardingSpec.scala | 50 ++++++++++--------- 7 files changed, 43 insertions(+), 42 deletions(-) diff --git a/build.sbt b/build.sbt index 155bb6f..240b31c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,8 +1,8 @@ val mainScala = "2.13.7" val allScala = Seq("2.12.15", mainScala) -val zioVersion = "2.0.0-RC1" -val akkaVersion = "2.6.18" +val zioVersion = "2.0.0-RC3" +val akkaVersion = "2.6.19" organization := "dev.zio" homepage := Some(url("https://github.com/zio/zio-akka-cluster")) diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index 38ba7c2..2e94bd5 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -61,9 +61,9 @@ object Cluster { initialStateAsEvents: Boolean = false ): ZIO[ActorSystem, Throwable, Unit] = for { - rts <- Task.runtime + rts <- Task.runtime[ActorSystem] actorSystem <- ZIO.service[ActorSystem] - _ <- Task(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) + _ <- Task.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) } yield () private[cluster] class SubscriberActor( diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index bfbb178..c2b3e46 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -13,7 +13,7 @@ private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { override def listenWith(topic: String, queue: Queue[A], group: Option[String] = None): Task[Unit] = for { - rts <- Task.runtime + rts <- Task.runtime[Any] subscribed <- Promise.make[Nothing, Unit] _ <- Task( getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed))) diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index 6f35169..ec724b5 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -139,10 +139,10 @@ object Sharding { override def context: ActorContext = actorContext override def id: String = actorContext.self.path.name override def state: Ref[Option[State]] = ref - override def stop: UIO[Unit] = UIO(actorContext.stop(self)) - override def passivate: UIO[Unit] = UIO(actorContext.parent ! Passivate(PoisonPill)) - override def passivateAfter(duration: Duration): UIO[Unit] = UIO(actorContext.self ! SetTimeout(duration)) - override def replyToSender[M](msg: M): Task[Unit] = Task(actorContext.sender() ! msg) + override def stop: UIO[Unit] = UIO.succeed(actorContext.stop(self)) + override def passivate: UIO[Unit] = UIO.succeed(actorContext.parent ! Passivate(PoisonPill)) + override def passivateAfter(duration: Duration): UIO[Unit] = UIO.succeed(actorContext.self ! SetTimeout(duration)) + override def replyToSender[M](msg: M): Task[Unit] = Task.attempt(actorContext.sender() ! msg) } val entity: ZLayer[Any, Nothing, Entity[State]] = ZLayer.succeed(service) diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index a76fdd4..463ac9a 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -36,10 +36,8 @@ object ClusterSpec extends ZIOSpecDefault { |} """.stripMargin) - val actorSystem: Managed[Throwable, ActorSystem] = - Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => - Task.fromFuture(_ => sys.terminate()).either - ) + val actorSystem: ZIO[Scope, Throwable, ActorSystem] = + ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) assertM( for { @@ -55,7 +53,7 @@ object ClusterSpec extends ZIOSpecDefault { .runCollect .timeoutFail(new Exception("Timeout"))(10 seconds) } yield items - )(isNonEmpty).provideLayer(ZLayer.fromManaged(actorSystem) ++ Clock.live) + )(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem) ++ Clock.live) } ) } diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index 305c439..78827da 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -6,7 +6,7 @@ import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment import zio.test.ZIOSpecDefault -import zio.{ ExecutionStrategy, Managed, Task, ZLayer } +import zio.{ ExecutionStrategy, Task, ZIO, ZLayer } object PubSubSpec extends ZIOSpecDefault { @@ -30,9 +30,10 @@ object PubSubSpec extends ZIOSpecDefault { """.stripMargin) val actorSystem: ZLayer[Any, Throwable, ActorSystem] = - ZLayer.fromManaged( - Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) - ) + ZLayer + .scoped( + ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ) val topic = "topic" val msg = "yo" diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index 388eefa..9ea8580 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -7,7 +7,7 @@ import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment import zio.test.ZIOSpecDefault -import zio.{ ExecutionStrategy, Managed, Promise, Task, UIO, ZIO, ZLayer } +import zio.{ ExecutionStrategy, Promise, Task, UIO, ZIO, ZLayer } import zio._ object ShardingSpec extends ZIOSpecDefault { @@ -33,9 +33,10 @@ object ShardingSpec extends ZIOSpecDefault { """.stripMargin) val actorSystem: ZLayer[Any, Throwable, ActorSystem] = - ZLayer.fromManaged( - Managed.acquireReleaseWith(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) - ) + ZLayer + .scoped( + ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ) val config2: Config = ConfigFactory.parseString(s""" |akka { @@ -58,11 +59,10 @@ object ShardingSpec extends ZIOSpecDefault { """.stripMargin) val actorSystem2: ZLayer[Any, Throwable, ActorSystem] = - ZLayer.fromManaged( - Managed.acquireReleaseWith(Task(ActorSystem("Test", config2)))(sys => - Task.fromFuture(_ => sys.terminate()).either + ZLayer + .scoped( + ZIO.acquireRelease(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) ) - ) val shardId = "shard" val shardName = "name" @@ -186,22 +186,24 @@ object ShardingSpec extends ZIOSpecDefault { }, test("work with 2 actor systems") { assertM( - actorSystem.build.use(a1 => - actorSystem2.build.use(a2 => - for { - p1 <- Promise.make[Nothing, Unit] - p2 <- Promise.make[Nothing, Unit] - onMessage1 = (_: String) => p1.succeed(()).unit - onMessage2 = (_: String) => p2.succeed(()).unit - sharding1 <- Sharding.start(shardName, onMessage1).provideEnvironment(a1) - sharding2 <- Sharding.start(shardName, onMessage2).provideEnvironment(a2) - _ <- sharding1.send("1", "hi") - _ <- sharding2.send("2", "hi") - _ <- p1.await - _ <- p2.await - } yield () + ZIO.scoped { + actorSystem.build.flatMap(a1 => + actorSystem2.build.flatMap(a2 => + for { + p1 <- Promise.make[Nothing, Unit] + p2 <- Promise.make[Nothing, Unit] + onMessage1 = (_: String) => p1.succeed(()).unit + onMessage2 = (_: String) => p2.succeed(()).unit + sharding1 <- Sharding.start(shardName, onMessage1).provideEnvironment(a1) + sharding2 <- Sharding.start(shardName, onMessage2).provideEnvironment(a2) + _ <- sharding1.send("1", "hi") + _ <- sharding2.send("2", "hi") + _ <- p1.await + _ <- p2.await + } yield () + ) ) - ) + } )(isUnit) }, test("provide proper environment to onMessage") { @@ -212,7 +214,7 @@ object ShardingSpec extends ZIOSpecDefault { ZIO.serviceWithZIO[TestService](_.doSomething()) val l = ZLayer.succeed(new TestService { - override def doSomething(): UIO[String] = UIO("test") + override def doSomething(): UIO[String] = UIO.succeed("test") }) assertM( From 44e486056d9a68ce5ad66c011cb46e22a47c9c60 Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Thu, 21 Apr 2022 01:46:16 +0200 Subject: [PATCH 06/11] Zio2 rc5 (#161) --- build.sbt | 2 +- src/main/scala/zio/akka/cluster/Cluster.scala | 8 ++--- .../zio/akka/cluster/pubsub/PubSub.scala | 3 +- .../cluster/pubsub/impl/PublisherImpl.scala | 2 +- .../cluster/pubsub/impl/SubscriberImpl.scala | 2 +- .../zio/akka/cluster/sharding/Sharding.scala | 10 +++--- .../scala/zio/akka/cluster/ClusterSpec.scala | 12 ++++--- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 8 +++-- .../akka/cluster/sharding/ShardingSpec.scala | 33 ++++++++++--------- 9 files changed, 44 insertions(+), 36 deletions(-) diff --git a/build.sbt b/build.sbt index 240b31c..7f08398 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val mainScala = "2.13.7" val allScala = Seq("2.12.15", mainScala) -val zioVersion = "2.0.0-RC3" +val zioVersion = "2.0.0-RC5" val akkaVersion = "2.6.19" organization := "dev.zio" diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index 2e94bd5..d51af5b 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -10,7 +10,7 @@ object Cluster { private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] = for { actorSystem <- ZIO.service[ActorSystem] - cluster <- Task(akka.cluster.Cluster(actorSystem)) + cluster <- Task.attempt(akka.cluster.Cluster(actorSystem)) } yield cluster /** @@ -19,7 +19,7 @@ object Cluster { val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] = for { cluster <- cluster - state <- Task(cluster.state) + state <- Task.attempt(cluster.state) } yield state /** @@ -28,7 +28,7 @@ object Cluster { def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task(cluster.joinSeedNodes(seedNodes)) + _ <- Task.attempt(cluster.joinSeedNodes(seedNodes)) } yield () /** @@ -37,7 +37,7 @@ object Cluster { val leave: ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task(cluster.leave(cluster.selfAddress)) + _ <- Task.attempt(cluster.leave(cluster.selfAddress)) } yield () /** diff --git a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala index b34f2a8..5387304 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala @@ -30,7 +30,8 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A] object PubSub { - private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = Task(DistributedPubSub(actorSystem).mediator) + private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = + Task.attempt(DistributedPubSub(actorSystem).mediator) /** * Creates a new `Publisher[A]`. diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala index 0e5bba5..975b851 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala @@ -9,5 +9,5 @@ private[pubsub] trait PublisherImpl[A] extends Publisher[A] { val getMediator: ActorRef override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] = - Task(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) + Task.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) } diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index c2b3e46..36fc4f3 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -15,7 +15,7 @@ private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { for { rts <- Task.runtime[Any] subscribed <- Promise.make[Nothing, Unit] - _ <- Task( + _ <- Task.attempt( getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed))) ) _ <- subscribed.await diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index ec724b5..95ba9b8 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -46,7 +46,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem with R] actorSystem = rts.environment.get[ActorSystem] - shardingRegion <- Task( + shardingRegion <- Task.attempt( ClusterSharding(actorSystem).start( typeName = name, entityProps = Props(new ShardEntity[R, Msg, State](rts)(onMessage)), @@ -87,7 +87,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem] actorSystem = rts.environment.get - shardingRegion <- Task( + shardingRegion <- Task.attempt( ClusterSharding(actorSystem).startProxy( typeName = name, role, @@ -114,13 +114,13 @@ object Sharding { val getShardingRegion: ActorRef override def send(entityId: String, data: Msg): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) override def stop(entityId: String): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) override def passivate(entityId: String): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] = Task.fromFuture(_ => diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index 463ac9a..4750e64 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -26,18 +26,20 @@ object ClusterSpec extends ZIOSpecDefault { | enabled-transports = ["akka.remote.artery.canonical"] | artery.canonical { | hostname = "127.0.0.1" - | port = 2551 + | port = 2554 | } | } | cluster { - | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2554"] | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) val actorSystem: ZIO[Scope, Throwable, ActorSystem] = - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) assertM( for { @@ -53,7 +55,7 @@ object ClusterSpec extends ZIOSpecDefault { .runCollect .timeoutFail(new Exception("Timeout"))(10 seconds) } yield items - )(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem) ++ Clock.live) + )(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem)) } - ) + ) @@ TestAspect.withLiveClock } diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index 78827da..1b9e208 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -19,11 +19,11 @@ object PubSubSpec extends ZIOSpecDefault { | enabled-transports = ["akka.remote.artery.canonical"] | artery.canonical { | hostname = "127.0.0.1" - | port = 2551 + | port = 2553 | } | } | cluster { - | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2553"] | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} @@ -32,7 +32,9 @@ object PubSubSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val topic = "topic" diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index 9ea8580..7a497b2 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -35,7 +35,9 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val config2: Config = ConfigFactory.parseString(s""" @@ -61,7 +63,9 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem2: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config2)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val shardId = "shard" @@ -129,10 +133,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.send(shardId, "die") - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -151,10 +154,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.passivate(shardId) - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -175,10 +177,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.send(shardId, "timeout") - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -227,6 +228,8 @@ object ShardingSpec extends ZIOSpecDefault { } yield res )(equalTo("test")).provideLayer(actorSystem ++ l) } - ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout(30.seconds) + ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout( + 30.seconds + ) @@ TestAspect.withLiveClock } From 146b343e99bcb980f5c3fd9c8e4d283f7fa75f39 Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Wed, 11 May 2022 09:00:35 +0200 Subject: [PATCH 07/11] zio2 rc6 (#162) --- build.sbt | 2 +- src/main/scala/zio/akka/cluster/Cluster.scala | 14 ++++----- .../zio/akka/cluster/pubsub/PubSub.scala | 2 +- .../cluster/pubsub/impl/PublisherImpl.scala | 4 +-- .../cluster/pubsub/impl/SubscriberImpl.scala | 6 ++-- .../zio/akka/cluster/sharding/Sharding.scala | 20 ++++++------- .../scala/zio/akka/cluster/ClusterSpec.scala | 8 ++--- .../zio/akka/cluster/pubsub/PubSubSpec.scala | 18 +++++------ .../akka/cluster/sharding/ShardingSpec.scala | 30 +++++++++---------- 9 files changed, 50 insertions(+), 54 deletions(-) diff --git a/build.sbt b/build.sbt index 7f08398..16b973e 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val mainScala = "2.13.7" val allScala = Seq("2.12.15", mainScala) -val zioVersion = "2.0.0-RC5" +val zioVersion = "2.0.0-RC6" val akkaVersion = "2.6.19" organization := "dev.zio" diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index d51af5b..571120d 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -3,14 +3,14 @@ package zio.akka.cluster import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props } import akka.cluster.ClusterEvent._ import zio.Exit.{ Failure, Success } -import zio.{ Queue, Runtime, Task, ZIO } +import zio.{ Queue, Runtime, ZIO } object Cluster { private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] = for { actorSystem <- ZIO.service[ActorSystem] - cluster <- Task.attempt(akka.cluster.Cluster(actorSystem)) + cluster <- ZIO.attempt(akka.cluster.Cluster(actorSystem)) } yield cluster /** @@ -19,7 +19,7 @@ object Cluster { val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] = for { cluster <- cluster - state <- Task.attempt(cluster.state) + state <- ZIO.attempt(cluster.state) } yield state /** @@ -28,7 +28,7 @@ object Cluster { def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task.attempt(cluster.joinSeedNodes(seedNodes)) + _ <- ZIO.attempt(cluster.joinSeedNodes(seedNodes)) } yield () /** @@ -37,7 +37,7 @@ object Cluster { val leave: ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task.attempt(cluster.leave(cluster.selfAddress)) + _ <- ZIO.attempt(cluster.leave(cluster.selfAddress)) } yield () /** @@ -61,9 +61,9 @@ object Cluster { initialStateAsEvents: Boolean = false ): ZIO[ActorSystem, Throwable, Unit] = for { - rts <- Task.runtime[ActorSystem] + rts <- ZIO.runtime[ActorSystem] actorSystem <- ZIO.service[ActorSystem] - _ <- Task.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) + _ <- ZIO.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents)))) } yield () private[cluster] class SubscriberActor( diff --git a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala index 5387304..8025245 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala @@ -31,7 +31,7 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A] object PubSub { private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = - Task.attempt(DistributedPubSub(actorSystem).mediator) + ZIO.attempt(DistributedPubSub(actorSystem).mediator) /** * Creates a new `Publisher[A]`. diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala index 975b851..32345b0 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala @@ -3,11 +3,11 @@ package zio.akka.cluster.pubsub.impl import akka.actor.ActorRef import akka.cluster.pubsub.DistributedPubSubMediator.Publish import zio.akka.cluster.pubsub.{ MessageEnvelope, Publisher } -import zio.Task +import zio.{ Task, ZIO } private[pubsub] trait PublisherImpl[A] extends Publisher[A] { val getMediator: ActorRef override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] = - Task.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) + ZIO.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) } diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index 36fc4f3..ad723fd 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -5,7 +5,7 @@ import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck } import zio.Exit.{ Failure, Success } import zio.akka.cluster.pubsub.impl.SubscriberImpl.SubscriberActor import zio.akka.cluster.pubsub.{ MessageEnvelope, Subscriber } -import zio.{ Promise, Queue, Runtime, Task } +import zio.{ Promise, Queue, Runtime, Task, ZIO } private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { val getActorSystem: ActorSystem @@ -13,9 +13,9 @@ private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { override def listenWith(topic: String, queue: Queue[A], group: Option[String] = None): Task[Unit] = for { - rts <- Task.runtime[Any] + rts <- ZIO.runtime[Any] subscribed <- Promise.make[Nothing, Unit] - _ <- Task.attempt( + _ <- ZIO.attempt( getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed))) ) _ <- subscribed.await diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index 95ba9b8..c9fc9d5 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -46,7 +46,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem with R] actorSystem = rts.environment.get[ActorSystem] - shardingRegion <- Task.attempt( + shardingRegion <- ZIO.attempt( ClusterSharding(actorSystem).start( typeName = name, entityProps = Props(new ShardEntity[R, Msg, State](rts)(onMessage)), @@ -87,7 +87,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem] actorSystem = rts.environment.get - shardingRegion <- Task.attempt( + shardingRegion <- ZIO.attempt( ClusterSharding(actorSystem).startProxy( typeName = name, role, @@ -114,16 +114,16 @@ object Sharding { val getShardingRegion: ActorRef override def send(entityId: String, data: Msg): Task[Unit] = - Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) + ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) override def stop(entityId: String): Task[Unit] = - Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) + ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) override def passivate(entityId: String): Task[Unit] = - Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) + ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] = - Task.fromFuture(_ => + ZIO.fromFuture(_ => (getShardingRegion ? sharding.MessageEnvelope(entityId, MessagePayload(data))) .mapTo[R] ) @@ -139,10 +139,10 @@ object Sharding { override def context: ActorContext = actorContext override def id: String = actorContext.self.path.name override def state: Ref[Option[State]] = ref - override def stop: UIO[Unit] = UIO.succeed(actorContext.stop(self)) - override def passivate: UIO[Unit] = UIO.succeed(actorContext.parent ! Passivate(PoisonPill)) - override def passivateAfter(duration: Duration): UIO[Unit] = UIO.succeed(actorContext.self ! SetTimeout(duration)) - override def replyToSender[M](msg: M): Task[Unit] = Task.attempt(actorContext.sender() ! msg) + override def stop: UIO[Unit] = ZIO.succeed(actorContext.stop(self)) + override def passivate: UIO[Unit] = ZIO.succeed(actorContext.parent ! Passivate(PoisonPill)) + override def passivateAfter(duration: Duration): UIO[Unit] = ZIO.succeed(actorContext.self ! SetTimeout(duration)) + override def replyToSender[M](msg: M): Task[Unit] = ZIO.attempt(actorContext.sender() ! msg) } val entity: ZLayer[Any, Nothing, Entity[State]] = ZLayer.succeed(service) diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index 4750e64..a45dd0d 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -14,7 +14,7 @@ import zio._ object ClusterSpec extends ZIOSpecDefault { - def spec: ZSpec[TestEnvironment, Any] = + def spec: Spec[TestEnvironment, Any] = suite("ClusterSpec")( test("receive cluster events") { val config: Config = ConfigFactory.parseString(s""" @@ -37,11 +37,11 @@ object ClusterSpec extends ZIOSpecDefault { """.stripMargin) val actorSystem: ZIO[Scope, Throwable, ActorSystem] = - ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => - Task.fromFuture(_ => sys.terminate()).either + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys => + ZIO.fromFuture(_ => sys.terminate()).either ) - assertM( + assertZIO( for { queue <- Cluster.clusterEvents() _ <- Clock.sleep(5 seconds) diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index 1b9e208..fe899bc 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -6,7 +6,7 @@ import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment import zio.test.ZIOSpecDefault -import zio.{ ExecutionStrategy, Task, ZIO, ZLayer } +import zio.{ ExecutionStrategy, ZIO, ZLayer } object PubSubSpec extends ZIOSpecDefault { @@ -32,18 +32,16 @@ object PubSubSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => - Task.fromFuture(_ => sys.terminate()).either - ) + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys => ZIO.fromFuture(_ => sys.terminate()).either) ) val topic = "topic" val msg = "yo" - def spec: ZSpec[TestEnvironment, Any] = + def spec: Spec[TestEnvironment, Any] = suite("PubSubSpec")( test("send and receive a single message") { - assertM( + assertZIO( for { pubSub <- PubSub.createPubSub[String] queue <- pubSub.listen(topic) @@ -53,7 +51,7 @@ object PubSubSpec extends ZIOSpecDefault { )(equalTo(msg)).provideLayer(actorSystem) }, test("support multiple subscribers") { - assertM( + assertZIO( for { pubSub <- PubSub.createPubSub[String] queue1 <- pubSub.listen(topic) @@ -66,7 +64,7 @@ object PubSubSpec extends ZIOSpecDefault { }, test("support multiple publishers") { val msg2 = "what's up" - assertM( + assertZIO( for { pubSub <- PubSub.createPubSub[String] queue <- pubSub.listen(topic) @@ -79,7 +77,7 @@ object PubSubSpec extends ZIOSpecDefault { }, test("send only one message to a single group") { val group = "group" - assertM( + assertZIO( for { pubSub <- PubSub.createPubSub[String] queue1 <- pubSub.listen(topic, Some(group)) @@ -93,7 +91,7 @@ object PubSubSpec extends ZIOSpecDefault { test("send one message to each group") { val group1 = "group1" val group2 = "group2" - assertM( + assertZIO( for { pubSub <- PubSub.createPubSub[String] queue1 <- pubSub.listen(topic, Some(group1)) diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index 7a497b2..a3c3d2e 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -7,7 +7,7 @@ import zio.test.Assertion._ import zio.test._ import zio.test.TestEnvironment import zio.test.ZIOSpecDefault -import zio.{ ExecutionStrategy, Promise, Task, UIO, ZIO, ZLayer } +import zio.{ ExecutionStrategy, Promise, UIO, ZIO, ZLayer } import zio._ object ShardingSpec extends ZIOSpecDefault { @@ -35,9 +35,7 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => - Task.fromFuture(_ => sys.terminate()).either - ) + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys => ZIO.fromFuture(_ => sys.terminate()).either) ) val config2: Config = ConfigFactory.parseString(s""" @@ -63,8 +61,8 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem2: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config2)))(sys => - Task.fromFuture(_ => sys.terminate()).either + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config2)))(sys => + ZIO.fromFuture(_ => sys.terminate()).either ) ) @@ -72,10 +70,10 @@ object ShardingSpec extends ZIOSpecDefault { val shardName = "name" val msg = "yo" - def spec: ZSpec[TestEnvironment, Any] = + def spec: Spec[TestEnvironment, Any] = suite("ShardingSpec")( test("send and receive a single message") { - assertM( + assertZIO( for { p <- Promise.make[Nothing, String] onMessage = (msg: String) => p.succeed(msg).unit @@ -88,7 +86,7 @@ object ShardingSpec extends ZIOSpecDefault { test("send and receive a message using ask") { val onMessage: String => ZIO[Entity[Any], Nothing, Unit] = incomingMsg => ZIO.serviceWithZIO[Entity[Any]](_.replyToSender(incomingMsg).orDie) - assertM( + assertZIO( for { sharding <- Sharding.start(shardName, onMessage) reply <- sharding.ask[String](shardId, msg) @@ -96,7 +94,7 @@ object ShardingSpec extends ZIOSpecDefault { )(equalTo(msg)).provideLayer(actorSystem) }, test("gather state") { - assertM( + assertZIO( for { p <- Promise.make[Nothing, Boolean] onMessage = (_: String) => @@ -120,7 +118,7 @@ object ShardingSpec extends ZIOSpecDefault { )(equalTo((None, true))).provideLayer(actorSystem) }, test("kill itself") { - assertM( + assertZIO( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => @@ -142,7 +140,7 @@ object ShardingSpec extends ZIOSpecDefault { )(isNone).provideLayer(actorSystem) }, test("passivate") { - assertM( + assertZIO( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => @@ -163,7 +161,7 @@ object ShardingSpec extends ZIOSpecDefault { )(isNone).provideLayer(actorSystem) }, test("passivateAfter") { - assertM( + assertZIO( for { p <- Promise.make[Nothing, Option[Unit]] onMessage = (msg: String) => @@ -186,7 +184,7 @@ object ShardingSpec extends ZIOSpecDefault { )(isNone).provideLayer(actorSystem) }, test("work with 2 actor systems") { - assertM( + assertZIO( ZIO.scoped { actorSystem.build.flatMap(a1 => actorSystem2.build.flatMap(a2 => @@ -215,10 +213,10 @@ object ShardingSpec extends ZIOSpecDefault { ZIO.serviceWithZIO[TestService](_.doSomething()) val l = ZLayer.succeed(new TestService { - override def doSomething(): UIO[String] = UIO.succeed("test") + override def doSomething(): UIO[String] = ZIO.succeed("test") }) - assertM( + assertZIO( for { p <- Promise.make[Nothing, String] onMessage = (_: String) => (doSomething flatMap p.succeed).unit From 6e7f0bba299f9a3f3ae105dfec59b8cfadd1db69 Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Wed, 11 May 2022 09:14:21 +0200 Subject: [PATCH 08/11] zio2 rc6 - readme (#163) --- README.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 9293ddb..0f027bd 100644 --- a/README.md +++ b/README.md @@ -111,11 +111,14 @@ This library wraps messages inside of a `zio.akka.cluster.pubsub.MessageEnvelope ```scala import akka.actor.ActorSystem -import zio.{ Managed, Task, ZLayer } +import zio.{ ZIO, ZLayer } import zio.akka.cluster.pubsub.PubSub val actorSystem: ZLayer[Any, Throwable, ActorSystem] = - ZLayer.fromManaged(Managed.acquireReleaseWith(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) + ZLayer + .scoped( + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test")))(sys => ZIO.fromFuture(_ => sys.terminate()).either) + ) (for { pubSub <- PubSub.createPubSub[String] @@ -176,10 +179,13 @@ This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelo ```scala import akka.actor.ActorSystem import zio.akka.cluster.sharding.{ Entity, Sharding } -import zio.{ Managed, Task, ZIO, ZLayer } +import zio.{ ZIO, ZLayer } val actorSystem: ZLayer[Any, Throwable, ActorSystem] = - ZLayer.fromManaged(Managed.acquireReleaseWith(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either)) + ZLayer + .scoped( + ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test")))(sys => ZIO.fromFuture(_ => sys.terminate()).either) + ) val behavior: String => ZIO[Entity[Int], Nothing, Unit] = { case "+" => ZIO.serviceWithZIO[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1))) From d4923257729f79a190686a16649e1c1cd602b99f Mon Sep 17 00:00:00 2001 From: Peter Kotula Date: Sun, 26 Jun 2022 10:48:35 +0200 Subject: [PATCH 09/11] zio2 (#164) --- build.sbt | 2 +- src/main/scala/zio/akka/cluster/Cluster.scala | 12 +++++++----- .../cluster/pubsub/impl/SubscriberImpl.scala | 16 ++++++++++------ .../zio/akka/cluster/sharding/Sharding.scala | 11 ++++++++--- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index 16b973e..be22dd6 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val mainScala = "2.13.7" val allScala = Seq("2.12.15", mainScala) -val zioVersion = "2.0.0-RC6" +val zioVersion = "2.0.0" val akkaVersion = "2.6.19" organization := "dev.zio" diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index 571120d..8ff3a3e 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -2,8 +2,7 @@ package zio.akka.cluster import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props } import akka.cluster.ClusterEvent._ -import zio.Exit.{ Failure, Success } -import zio.{ Queue, Runtime, ZIO } +import zio.{ Exit, Queue, Runtime, Unsafe, ZIO } object Cluster { @@ -78,9 +77,12 @@ object Cluster { def receive: Actor.Receive = { case ev: ClusterDomainEvent => - rts.unsafeRunAsyncWith(queue.offer(ev)) { - case Success(_) => () - case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down + Unsafe.unsafeCompat { implicit u => + val fiber = rts.unsafe.fork(queue.offer(ev)) + fiber.unsafe.addObserver { + case Exit.Success(_) => () + case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down + } } () case _ => () diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index ad723fd..21b2828 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -2,10 +2,9 @@ package zio.akka.cluster.pubsub.impl import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props } import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck } -import zio.Exit.{ Failure, Success } import zio.akka.cluster.pubsub.impl.SubscriberImpl.SubscriberActor import zio.akka.cluster.pubsub.{ MessageEnvelope, Subscriber } -import zio.{ Promise, Queue, Runtime, Task, ZIO } +import zio.{ Exit, Promise, Queue, Runtime, Task, Unsafe, ZIO } private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { val getActorSystem: ActorSystem @@ -36,12 +35,17 @@ object SubscriberImpl { def receive: Actor.Receive = { case SubscribeAck(_) => - rts.unsafeRunSync(subscribed.succeed(())) + Unsafe.unsafeCompat { implicit u => + rts.unsafe.run(subscribed.succeed(())).getOrThrow() + } () case MessageEnvelope(msg) => - rts.unsafeRunAsyncWith(queue.offer(msg.asInstanceOf[A])) { - case Success(_) => () - case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down + Unsafe.unsafeCompat { implicit u => + val fiber = rts.unsafe.fork(queue.offer(msg.asInstanceOf[A])) + fiber.unsafe.addObserver { + case Exit.Success(_) => () + case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down + } } () } diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index c9fc9d5..4efa85f 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -9,7 +9,7 @@ import akka.pattern.{ ask => askPattern } import akka.util.Timeout import zio.akka.cluster.sharding import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PassivatePayload, PoisonPillPayload } -import zio.{ =!=, Ref, Runtime, Tag, Task, UIO, ZIO, ZLayer } +import zio.{ =!=, Ref, Runtime, Tag, Task, UIO, Unsafe, ZIO, ZLayer } /** * A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one. @@ -133,7 +133,10 @@ object Sharding { onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit] ) extends Actor { - val ref: Ref[Option[State]] = rts.unsafeRun(Ref.make[Option[State]](None)) + val ref: Ref[Option[State]] = + Unsafe.unsafeCompat { implicit u => + rts.unsafe.run(Ref.make[Option[State]](None)).getOrThrow() + } val actorContext: ActorContext = context val service: Entity.Service[State] = new Entity.Service[State] { override def context: ActorContext = actorContext @@ -154,7 +157,9 @@ object Sharding { case p: Passivate => actorContext.parent ! p case MessagePayload(msg) => - rts.unsafeRunSync(onMessage(msg.asInstanceOf[Msg]).provideSomeLayer[R](entity)) + Unsafe.unsafeCompat { implicit u => + rts.unsafe.run(onMessage(msg.asInstanceOf[Msg]).provideSomeLayer[R](entity)).getOrThrow() + } () case _ => } From bf4ebd3c867c49a3b13c50f7179eade57ebc81b2 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Sun, 26 Jun 2022 17:56:42 +0900 Subject: [PATCH 10/11] Update ci.yml --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00e4570..1b5b8bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ name: CI on: pull_request: push: - branches: ['master', 'series/2.x'] + branches: ['series/1.x', 'series/2.x'] release: types: - published From 73c55a176d0830a741c98dd9354a086113ef0fcd Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Sun, 10 Jul 2022 07:45:41 +0000 Subject: [PATCH 11/11] Update sbt-ci-release to 1.5.10 --- project/plugins.sbt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 53e1a14..6b58e2d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,3 @@ -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.5") -addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.33") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.5") +addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.33")