Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unlawful instances #595

Merged
merged 10 commits into from
Sep 23, 2022
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package zio.interop

import cats.effect.{ IO as CIO, LiftIO }
import cats.effect.{ Async, IO as CIO, LiftIO, Outcome }
import cats.effect.kernel.{ Concurrent, Resource }
import zio.interop.catz.*
import zio.test.*
import zio.{ Promise, Task, ZIO }
import zio.*

object CatsInteropSpec extends CatsRunnableSpec {
def spec = suite("Cats interop")(
Expand All @@ -25,6 +25,184 @@ object CatsInteropSpec extends CatsRunnableSpec {
_ <- lift.liftIO(promise1.get)
_ <- fiber.interrupt
} yield assertCompletes
},
test("ZIO respects Async#async cancel finalizer") {
def test[F[_]](implicit F: Async[F]) = {
import cats.syntax.all.*
import cats.effect.syntax.all.*
for {
counter <- F.ref(0)
latch <- F.deferred[Unit]
fiber <- F.start(
F.async[Unit] { _ =>
for {
_ <- latch.complete(())
_ <- counter.update(_ + 1)
} yield Some(counter.update(_ + 1))
}.forceR(counter.update(_ + 9000))
)
_ <- latch.get
_ <- fiber.cancel
res <- counter.get
} yield assertTrue(res == 2)
}

for {
sanityCheckCIO <- fromEffect(test[CIO])
zioResult <- test[Task]
} yield zioResult && sanityCheckCIO
},
test("onCancel is not triggered by ZIO.parTraverse + ZIO.fail https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.fail(new RuntimeException("x")).unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.exit
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "ABC")
},
test("onCancel is not triggered by ZIO.parTraverse + ZIO.die https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.die(new RuntimeException("x")).unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.exit
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "AC")
},
test("onCancel is not triggered by ZIO.parTraverse + ZIO.interrupt https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.interrupt.unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.exit
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "AC")
},
test(
"onCancel is triggered when a fiber executing ZIO.parTraverse + ZIO.fail is interrupted and the inner typed" +
" error is, unlike ZIO 1, preserved in final Cause (in ZIO 1 Fail & Interrupt nodes CAN both exist in Cause after external interruption)"
) {
val F = Concurrent[Task]

def println(s: String): Unit = {
val _ = s
}

for {
latch1 <- F.deferred[Unit]
latch2 <- F.deferred[Unit]
latch3 <- F.deferred[Unit]
counter <- F.ref("")
cause <- F.ref(Option.empty[Cause[Throwable]])
fiberId <- ZIO.fiberId
fiber <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO
.collectAllPar(
List(
F.onCancel(
ZIO.never,
ZIO.succeed(println("A")) *> latch2.complete(()).unit
).onExit(_ => ZIO.succeed(println("XA"))),
(latch1.complete(()) *> latch3.get *> ZIO.succeed(println("C"))).uninterruptible,
counter.update(_ + "A") *>
latch1.get *>
ZIO.succeed(println("B")) *> ZIO.fail(new RuntimeException("The_Error")).unit
)
)
.onExit {
case Exit.Success(_) => ZIO.unit
case Exit.Failure(c) => cause.set(Some(c)).orDie
},
counter.update(_ + "B")
)
) { case _ => counter.update(_ + "1") }
) {
case Outcome.Errored(_) => counter.update(_ + "2")
case Outcome.Canceled() => counter.update(_ + "C")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.fork
_ = println("x1")
_ <- latch2.get
_ = println("x2")
_ <- fiber.interruptFork
_ = println("x3")
_ <- latch3.complete(())
_ <- fiber.interrupt
_ = println("x4")
res <- counter.get
cause <- cause.get
} yield assertTrue(!res.contains("1")) &&
assertTrue(res == "ABC") &&
assertTrue(cause.isDefined) &&
assertTrue(cause.get.prettyPrint.contains("The_Error")) &&
assertTrue(cause.get.interruptors.contains(fiberId))
},
test("F.canceled.toEffect results in CancellationException, not BoxedException") {
val F = Concurrent[Task]

val exception: Option[Throwable] =
try {
F.canceled.toEffect[cats.effect.IO].unsafeRunSync()
None
} catch {
case t: Throwable => Some(t)
}

assertTrue(
!exception.get.getMessage.contains("Boxed Exception") &&
exception.get.getMessage.contains("The fiber was canceled")
)
}
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.interop

import cats.effect.kernel.Resource
import cats.effect.kernel.{ Concurrent, Resource }
import cats.effect.IO as CIO
import zio.*
import zio.interop.catz.*
Expand All @@ -15,13 +15,39 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
def spec =
suite("CatsZManagedSyntaxSpec")(
suite("toManaged")(
test("calls finalizers correctly when use is interrupted") {
test("calls finalizers correctly when use is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[CIO, Unit] =
Resource.makeCase(CIO.delay(effects += x).void) {
case (_, Resource.ExitCase.Canceled) =>
CIO.delay(effects += x + 1).void
case _ => CIO.unit
case (_, _) =>
CIO.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManaged
Promise.make[Nothing, Unit].flatMap { latch =>
managed
.use(_ => latch.succeed(()) *> ZIO.never)
.forkDaemon
.flatMap(latch.await *> _.interrupt)
}
}

for {
_ <- testCase
effects <- ZIO.succeed(effects.toList)
} yield assert(effects)(equalTo(List(1, 2)))
},
test("calls finalizers correctly when use is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[CIO, Unit] =
Resource.makeCase(CIO.delay(effects += x).void) {
case (_, Resource.ExitCase.Errored(_)) =>
CIO.delay(effects += x + 1).void
case (_, _) =>
CIO.unit
}

val testCase = {
Expand Down Expand Up @@ -128,7 +154,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
}
),
suite("toManagedZIO")(
test("calls finalizers correctly when use is interrupted") {
test("calls finalizers correctly when use is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[Task, Unit] =
Resource.makeCase(ZIO.attempt(effects += x).unit) {
Expand All @@ -137,6 +163,30 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
case _ => ZIO.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO
Promise.make[Nothing, Unit].flatMap { latch =>
managed
.use(_ => latch.succeed(()) *> ZIO.never)
.forkDaemon
.flatMap(latch.await *> _.interrupt)
}
}

for {
_ <- testCase
effects <- ZIO.succeed(effects.toList)
} yield assert(effects)(equalTo(List(1, 2)))
},
test("calls finalizers correctly when use is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[Task, Unit] =
Resource.makeCase(ZIO.attempt(effects += x).unit) {
case (_, Resource.ExitCase.Errored(_)) =>
ZIO.attempt(effects += x + 1).unit
case _ => ZIO.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO
managed.use(_ => ZIO.interrupt.unit)
Expand Down Expand Up @@ -268,13 +318,13 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
effects <- ZIO.succeed(effects.toList)
} yield assert(effects)(equalTo(List(1, 2)))
},
test("calls finalizers when using resource is canceled") {
test("calls finalizers when using resource is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def man(x: Int): ZManaged[Any, Throwable, Unit] =
ZManaged.acquireReleaseExitWith(ZIO.succeed(effects += x).unit) {
case (_, e) if e.isInterrupted =>
case (_, Exit.Failure(c)) if !c.isInterrupted && c.failureOption.nonEmpty =>
ZIO.succeed(effects += x + 1)
case _ =>
case _ =>
ZIO.unit
}

Expand All @@ -284,6 +334,28 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
effects <- ZIO.succeed(effects.toList)
} yield assert(effects)(equalTo(List(1, 2)))
},
test("calls finalizers when using resource is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def man(x: Int): ZManaged[Any, Throwable, Unit] =
ZManaged.acquireReleaseExitWith(ZIO.succeed(effects += x).unit) {
case (_, e) if e.isInterrupted =>
ZIO.succeed(effects += x + 1)
case _ =>
ZIO.unit
}

val exception: Option[Throwable] =
try {
man(1).toResource[Task].use(_ => Concurrent[Task].canceled).toEffect[cats.effect.IO].unsafeRunSync()
None
} catch {
case t: Throwable => Some(t)
}

assert(effects.toList)(equalTo(List(1, 2))) && assertTrue(
exception.get.getMessage.contains("The fiber was canceled")
)
},
test("acquisition of Reservation preserves cancellability in new F") {
for {
startLatch <- Promise.make[Nothing, Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,30 @@ class CatsSpec extends ZioSpecBase {
"Temporal[Task]",
implicit tc => GenTemporalTests[Task, Throwable].temporal[Int, Int, Int](100.millis)
)
checkAllAsync("GenSpawn[IO[Int, _], Int]", implicit tc => GenSpawnTests[IO[Int, _], Int].spawn[Int, Int, Int])
checkAllAsync("MonadError[IO[In t, _]]", implicit tc => MonadErrorTests[IO[Int, _], Int].monadError[Int, Int, Int])

locally {
checkAllAsync(
"GenTemporal[IO[Int, _], Cause[Int]]",
{ implicit tc =>
import zio.interop.catz.generic.*
GenTemporalTests[IO[Int, _], Cause[Int]].temporal[Int, Int, Int](100.millis)
}
)
checkAllAsync(
"GenSpawn[IO[Int, _], Cause[Int]]",
{ implicit tc =>
import zio.interop.catz.generic.*
GenSpawnTests[IO[Int, _], Cause[Int]].spawn[Int, Int, Int]
}
)
checkAllAsync(
"MonadCancel[IO[In t, _], Cause[Int]]",
{ implicit tc =>
import zio.interop.catz.generic.*
MonadCancelTests[IO[Int, _], Cause[Int]].monadCancel[Int, Int, Int]
}
)
}
checkAllAsync("MonoidK[IO[Int, _]]", implicit tc => MonoidKTests[IO[Int, _]].monoidK[Int])
checkAllAsync("SemigroupK[IO[Option[Unit], _]]", implicit tc => SemigroupKTests[IO[Option[Unit], _]].semigroupK[Int])
checkAllAsync("SemigroupK[Task]", implicit tc => SemigroupKTests[Task].semigroupK[Int])
Expand All @@ -46,9 +68,13 @@ class CatsSpec extends ZioSpecBase {

Async[RIO[ZClock, _]]
Sync[RIO[ZClock, _]]
GenTemporal[ZIO[ZClock, Int, _], Int]
locally {
import zio.interop.catz.generic.*

GenTemporal[ZIO[ZClock, Int, _], Cause[Int]]
GenConcurrent[ZIO[String, Int, _], Cause[Int]]
}
Temporal[RIO[ZClock, _]]
GenConcurrent[ZIO[String, Int, _], Int]
Concurrent[RIO[String, _]]
MonadError[RIO[String, _], Throwable]
Monad[RIO[String, _]]
Expand All @@ -66,7 +92,10 @@ class CatsSpec extends ZioSpecBase {

def liftRIO(implicit runtime: IORuntime) = LiftIO[RIO[String, _]]
def liftZManaged(implicit runtime: IORuntime) = LiftIO[ZManaged[String, Throwable, _]]
def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = GenTemporal[ZIO[Any, Int, _], Int]
def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = {
import zio.interop.catz.generic.*
GenTemporal[ZIO[Any, Int, _], Cause[Int]]
}
def runtimeTemporal(implicit runtime: Runtime[ZClock]) = Temporal[Task]
}

Expand Down
Loading