Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sbt-ci-release to 1.5.10 #158

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: CI
on:
pull_request:
push:
branches: ['master']
branches: ['series/1.x', 'series/2.x']
release:
types:
- published
Expand Down
34 changes: 20 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/cluster-usage.htm
You can also manually join a cluster using `Cluster.join`.

```scala
def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit]
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit]
```

It's possible to get the status of the cluster by calling `Cluster.clusterState`

```scala
val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState]
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState]
```

To monitor the cluster and be informed of changes (e.g. new members, member unreachable, etc), use `Cluster.clusterEvents`.
Expand All @@ -54,13 +54,13 @@ To unsubscribe, simply `shutdown` the queue.
`initialStateAsEvents` indicates if you want to receive previous cluster events leading to the current state, or only future events.

```scala
def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]]
def clusterEvents(initialStateAsEvents: Boolean = false): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]]
```

Finally, you can leave the current cluster using `Cluster.leave`.

```scala
val leave: ZIO[Has[ActorSystem], Throwable, Unit]
val leave: ZIO[ActorSystem, Throwable, Unit]
```

### Akka PubSub
Expand All @@ -75,7 +75,7 @@ See [Akka Documentation](https://doc.akka.io/docs/akka/current/distributed-pub-s
To create a `PubSub` object which can both publish and subscribe, use `PubSub.createPubSub`.

```scala
def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]]
def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]]
```

There are also less powerful variants `PubSub.createPublisher` if you only need to publish and `PubSub.createSubscriber` if you only need to subscribe.
Expand Down Expand Up @@ -111,11 +111,14 @@ This library wraps messages inside of a `zio.akka.cluster.pubsub.MessageEnvelope

```scala
import akka.actor.ActorSystem
import zio.{ Has, Managed, Task, ZLayer }
import zio.{ ZIO, ZLayer }
import zio.akka.cluster.pubsub.PubSub

val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test")))(sys => ZIO.fromFuture(_ => sys.terminate()).either)
)

(for {
pubSub <- PubSub.createPubSub[String]
Expand Down Expand Up @@ -143,7 +146,7 @@ def start[R, Msg, State](
name: String,
onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit],
numberOfShards: Int = 100
): ZIO[Has[ActorSystem] with R, Throwable, Sharding[Msg]]
): ZIO[ActorSystem with R, Throwable, Sharding[Msg]]
```

It requires:
Expand Down Expand Up @@ -176,14 +179,17 @@ This library wraps messages inside of a `zio.akka.cluster.sharding.MessageEnvelo
```scala
import akka.actor.ActorSystem
import zio.akka.cluster.sharding.{ Entity, Sharding }
import zio.{ Has, Managed, Task, ZIO, ZLayer }
import zio.{ ZIO, ZLayer }

val actorSystem: ZLayer[Any, Throwable, Has[ActorSystem]] =
ZLayer.fromManaged(Managed.make(Task(ActorSystem("Test")))(sys => Task.fromFuture(_ => sys.terminate()).either))
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test")))(sys => ZIO.fromFuture(_ => sys.terminate()).either)
)

val behavior: String => ZIO[Entity[Int], Nothing, Unit] = {
case "+" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) + 1)))
case "-" => ZIO.accessM[Entity[Int]](_.get.state.update(x => Some(x.getOrElse(0) - 1)))
case "+" => ZIO.serviceWithZIO[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) + 1)))
case "-" => ZIO.serviceWithZIO[Entity[Int]](_.state.update(x => Some(x.getOrElse(0) - 1)))
case _ => ZIO.unit
}

Expand Down
37 changes: 15 additions & 22 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
val mainScala = "2.13.1"
val allScala = Seq("2.11.12", "2.12.10", mainScala)
val mainScala = "2.13.7"
val allScala = Seq("2.12.15", mainScala)

val zioVersion = "2.0.0"
val akkaVersion = "2.6.19"

organization := "dev.zio"
homepage := Some(url("https://github.com/zio/zio-akka-cluster"))
name := "zio-akka-cluster"
licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0"))
scalaVersion := mainScala
parallelExecution in Test := false
fork in Test := true
Test / parallelExecution := false
Test / fork := true
pgpPublicRing := file("/tmp/public.asc")
pgpSecretRing := file("/tmp/secret.asc")
scmInfo := Some(
Expand All @@ -23,13 +26,13 @@ developers := List(
)

libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "1.0.13",
"dev.zio" %% "zio-streams" % "1.0.13",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.32",
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.32",
"dev.zio" %% "zio-test" % "1.0.13" % "test",
"dev.zio" %% "zio-test-sbt" % "1.0.13" % "test",
compilerPlugin("org.typelevel" %% "kind-projector" % "0.10.3"),
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"dev.zio" %% "zio-test" % zioVersion % "test",
"dev.zio" %% "zio-test-sbt" % zioVersion % "test",
compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2" cross CrossVersion.full),
compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
)

Expand All @@ -50,16 +53,6 @@ scalacOptions ++= Seq(
"-Ywarn-unused",
"-Ywarn-value-discard"
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 11)) =>
Seq(
"-Yno-adapted-args",
"-Ypartial-unification",
"-Ywarn-inaccessible",
"-Ywarn-infer-any",
"-Ywarn-nullary-override",
"-Ywarn-nullary-unit",
"-Xfuture"
)
case Some((2, 12)) =>
Seq(
"-Xsource:2.13",
Expand All @@ -78,7 +71,7 @@ scalacOptions ++= Seq(
case _ => Nil
})

fork in run := true
run / fork := true

crossScalaVersions := allScala

Expand Down
5 changes: 3 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.5")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.33")
45 changes: 24 additions & 21 deletions src/main/scala/zio/akka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,41 @@ package zio.akka.cluster

import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props }
import akka.cluster.ClusterEvent._
import zio.Exit.{ Failure, Success }
import zio.{ Has, Queue, Runtime, Task, ZIO }
import zio.{ Exit, Queue, Runtime, Unsafe, ZIO }

object Cluster {

private val cluster: ZIO[Has[ActorSystem], Throwable, akka.cluster.Cluster] =
private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] =
for {
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
cluster <- Task(akka.cluster.Cluster(actorSystem))
actorSystem <- ZIO.service[ActorSystem]
cluster <- ZIO.attempt(akka.cluster.Cluster(actorSystem))
} yield cluster

/**
* Returns the current state of the cluster.
*/
val clusterState: ZIO[Has[ActorSystem], Throwable, CurrentClusterState] =
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] =
for {
cluster <- cluster
state <- Task(cluster.state)
state <- ZIO.attempt(cluster.state)
} yield state

/**
* Joins a cluster using the provided seed nodes.
*/
def join(seedNodes: List[Address]): ZIO[Has[ActorSystem], Throwable, Unit] =
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.joinSeedNodes(seedNodes))
_ <- ZIO.attempt(cluster.joinSeedNodes(seedNodes))
} yield ()

/**
* Leaves the current cluster.
*/
val leave: ZIO[Has[ActorSystem], Throwable, Unit] =
val leave: ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.leave(cluster.selfAddress))
_ <- ZIO.attempt(cluster.leave(cluster.selfAddress))
} yield ()

/**
Expand All @@ -48,7 +47,7 @@ object Cluster {
*/
def clusterEvents(
initialStateAsEvents: Boolean = false
): ZIO[Has[ActorSystem], Throwable, Queue[ClusterDomainEvent]] =
): ZIO[ActorSystem, Throwable, Queue[ClusterDomainEvent]] =
Queue.unbounded[ClusterDomainEvent].tap(clusterEventsWith(_, initialStateAsEvents))

/**
Expand All @@ -59,11 +58,11 @@ object Cluster {
def clusterEventsWith(
queue: Queue[ClusterDomainEvent],
initialStateAsEvents: Boolean = false
): ZIO[Has[ActorSystem], Throwable, Unit] =
): ZIO[ActorSystem, Throwable, Unit] =
for {
rts <- Task.runtime
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
_ <- Task(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents))))
rts <- ZIO.runtime[ActorSystem]
actorSystem <- ZIO.service[ActorSystem]
_ <- ZIO.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents))))
} yield ()

private[cluster] class SubscriberActor(
Expand All @@ -76,13 +75,17 @@ object Cluster {
if (initialStateAsEvents) InitialStateAsEvents else InitialStateAsSnapshot
akka.cluster.Cluster(context.system).subscribe(self, initialState, classOf[ClusterDomainEvent])

def receive: PartialFunction[Any, Unit] = {
def receive: Actor.Receive = {
case ev: ClusterDomainEvent =>
rts.unsafeRunAsync(queue.offer(ev)) {
case Success(_) => ()
case Failure(cause) => if (cause.interrupted) self ! PoisonPill // stop listening if the queue was shut down
Unsafe.unsafeCompat { implicit u =>
val fiber = rts.unsafe.fork(queue.offer(ev))
fiber.unsafe.addObserver {
case Exit.Success(_) => ()
case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
}
}
case _ =>
()
case _ => ()
}
}

Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/zio/akka/cluster/pubsub/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.akka.cluster.pubsub
import akka.actor.{ ActorRef, ActorSystem }
import akka.cluster.pubsub.DistributedPubSub
import zio.akka.cluster.pubsub.impl.{ PublisherImpl, SubscriberImpl }
import zio.{ Has, Queue, Task, ZIO }
import zio.{ Queue, Task, ZIO }

/**
* A `Publisher[A]` is able to send messages of type `A` through Akka PubSub.
Expand All @@ -30,14 +30,15 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A]

object PubSub {

private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = Task(DistributedPubSub(actorSystem).mediator)
private def getMediator(actorSystem: ActorSystem): Task[ActorRef] =
ZIO.attempt(DistributedPubSub(actorSystem).mediator)

/**
* Creates a new `Publisher[A]`.
*/
def createPublisher[A]: ZIO[Has[ActorSystem], Throwable, Publisher[A]] =
def createPublisher[A]: ZIO[ActorSystem, Throwable, Publisher[A]] =
for {
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
actorSystem <- ZIO.service[ActorSystem]
mediator <- getMediator(actorSystem)
} yield new Publisher[A] with PublisherImpl[A] {
override val getMediator: ActorRef = mediator
Expand All @@ -46,9 +47,9 @@ object PubSub {
/**
* Creates a new `Subscriber[A]`.
*/
def createSubscriber[A]: ZIO[Has[ActorSystem], Throwable, Subscriber[A]] =
def createSubscriber[A]: ZIO[ActorSystem, Throwable, Subscriber[A]] =
for {
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
actorSystem <- ZIO.service[ActorSystem]
mediator <- getMediator(actorSystem)
} yield new Subscriber[A] with SubscriberImpl[A] {
override val getActorSystem: ActorSystem = actorSystem
Expand All @@ -58,9 +59,9 @@ object PubSub {
/**
* Creates a new `PubSub[A]`.
*/
def createPubSub[A]: ZIO[Has[ActorSystem], Throwable, PubSub[A]] =
def createPubSub[A]: ZIO[ActorSystem, Throwable, PubSub[A]] =
for {
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
actorSystem <- ZIO.service[ActorSystem]
mediator <- getMediator(actorSystem)
} yield new PubSub[A] with PublisherImpl[A] with SubscriberImpl[A] {
override val getActorSystem: ActorSystem = actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package zio.akka.cluster.pubsub.impl
import akka.actor.ActorRef
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import zio.akka.cluster.pubsub.{ MessageEnvelope, Publisher }
import zio.Task
import zio.{ Task, ZIO }

private[pubsub] trait PublisherImpl[A] extends Publisher[A] {
val getMediator: ActorRef

override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] =
Task(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
ZIO.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package zio.akka.cluster.pubsub.impl

import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props }
import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck }
import zio.Exit.{ Failure, Success }
import zio.akka.cluster.pubsub.impl.SubscriberImpl.SubscriberActor
import zio.akka.cluster.pubsub.{ MessageEnvelope, Subscriber }
import zio.{ Promise, Queue, Runtime, Task }
import zio.{ Exit, Promise, Queue, Runtime, Task, Unsafe, ZIO }

private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] {
val getActorSystem: ActorSystem
val getMediator: ActorRef

override def listenWith(topic: String, queue: Queue[A], group: Option[String] = None): Task[Unit] =
for {
rts <- Task.runtime
rts <- ZIO.runtime[Any]
subscribed <- Promise.make[Nothing, Unit]
_ <- Task(
_ <- ZIO.attempt(
getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed)))
)
_ <- subscribed.await
Expand All @@ -34,15 +33,21 @@ object SubscriberImpl {

mediator ! Subscribe(topic, group, self)

def receive: PartialFunction[Any, Unit] = {
def receive: Actor.Receive = {
case SubscribeAck(_) =>
rts.unsafeRunSync(subscribed.succeed(()))
Unsafe.unsafeCompat { implicit u =>
rts.unsafe.run(subscribed.succeed(())).getOrThrow()
}
()
case MessageEnvelope(msg) =>
rts.unsafeRunAsync(queue.offer(msg.asInstanceOf[A])) {
case Success(_) => ()
case Failure(cause) => if (cause.interrupted) self ! PoisonPill // stop listening if the queue was shut down
Unsafe.unsafeCompat { implicit u =>
val fiber = rts.unsafe.fork(queue.offer(msg.asInstanceOf[A]))
fiber.unsafe.addObserver {
case Exit.Success(_) => ()
case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
}
}
()
}
}
}
Loading