From 5e6926083b6610f8833f4ee3c3fc52a0426badf3 Mon Sep 17 00:00:00 2001 From: johnspade Date: Tue, 2 Aug 2022 00:00:36 +0300 Subject: [PATCH 1/9] Fix #541 #509 respect finalizer in Async#async implementation #542 https://github.com/zio/interop-cats/pull/542 --- .../scala/zio/interop/CatsInteropSpec.scala | 28 ++++++++++++++++++- .../src/main/scala/zio/interop/ZioAsync.scala | 18 ++++++++---- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index 5fb7e087..6031c9d0 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -1,6 +1,6 @@ package zio.interop -import cats.effect.{ IO as CIO, LiftIO } +import cats.effect.{ Async, IO as CIO, LiftIO } import cats.effect.kernel.{ Concurrent, Resource } import zio.interop.catz.* import zio.test.* @@ -25,6 +25,32 @@ object CatsInteropSpec extends CatsRunnableSpec { _ <- lift.liftIO(promise1.get) _ <- fiber.interrupt } yield assertCompletes + }, + test("ZIO respects Async#async cancel finalizer") { + def test[F[_]](implicit F: Async[F]) = { + import cats.syntax.all.* + import cats.effect.syntax.all.* + for { + counter <- F.ref(0) + latch <- F.deferred[Unit] + fiber <- F.start( + F.async[Unit] { _ => + for { + _ <- latch.complete(()) + _ <- counter.update(_ + 1) + } yield Some(counter.update(_ + 1)) + }.forceR(counter.update(_ + 9000)) + ) + _ <- latch.get + _ <- fiber.cancel + res <- counter.get + } yield assertTrue(res == 2) + } + + for { + sanityCheckCIO <- fromEffect(test[CIO]) + zioResult <- test[Task] + } yield zioResult && sanityCheckCIO } ) } diff --git a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala index e21fd78a..ccf36ce7 100644 --- a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala @@ -1,7 +1,7 @@ package zio.interop import cats.effect.kernel.{ Async, Cont, Sync, Unique } -import zio.{ Promise, RIO, ZIO } +import zio.{ Exit, Promise, RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } @@ -38,11 +38,17 @@ private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _] ZIO.attemptBlockingInterrupt(thunk) override final def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = - Promise.make[Nothing, Unit].flatMap { promise => - ZIO.asyncZIO { register => - k(either => register(promise.await *> ZIO.fromEither(either))) *> promise.succeed(()) - } - } + for { + cancelerPromise <- Promise.make[Nothing, Option[F[Unit]]] + res <- ZIO + .asyncZIO[R, Throwable, A] { resume => + k(exitEither => resume(ZIO.fromEither(exitEither))).onExit { + case Exit.Success(maybeCanceler) => cancelerPromise.succeed(maybeCanceler) + case _: Exit.Failure[?] => cancelerPromise.succeed(None) + } + } + .onInterrupt(cancelerPromise.await.flatMap(ZIO.foreach(_)(identity)).orDie) + } yield res override final def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = ZIO.async(register => k(register.compose(fromEither))) From ac36acc8ac0f3bbbe2fc43b68b186604d03d4e81 Mon Sep 17 00:00:00 2001 From: johnspade Date: Tue, 2 Aug 2022 00:09:07 +0300 Subject: [PATCH 2/9] Always convert non-interrupt failures to Outcome.Errored. Define standard Concurrent/Temporal instances only for Throwable error. Define generic Concurrent/Temporal as operating on Cause[E] errors to be able to wrap non-interrupt errors to Outcome.Errored. #543 https://github.com/zio/interop-cats/pull/543 --- .../src/test/scala/zio/interop/CatsSpec.scala | 39 +++- .../test/scala/zio/interop/CatsSpecBase.scala | 32 ++- .../test/scala/zio/interop/ZioSpecBase.scala | 19 ++ .../src/main/scala/zio/interop/ZioAsync.scala | 31 +-- .../src/main/scala/zio/interop/ZioAsync.scala | 31 +-- .../src/main/scala/zio/interop/cats.scala | 204 +++++++++++++----- .../main/scala/zio/interop/catszmanaged.scala | 6 +- .../src/main/scala/zio/interop/package.scala | 47 +++- .../zio/stream/interop/FS2StreamSyntax.scala | 5 +- 9 files changed, 308 insertions(+), 106 deletions(-) diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala index 2c44a676..30078067 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpec.scala @@ -21,8 +21,30 @@ class CatsSpec extends ZioSpecBase { "Temporal[Task]", implicit tc => GenTemporalTests[Task, Throwable].temporal[Int, Int, Int](100.millis) ) - checkAllAsync("GenSpawn[IO[Int, _], Int]", implicit tc => GenSpawnTests[IO[Int, _], Int].spawn[Int, Int, Int]) - checkAllAsync("MonadError[IO[In t, _]]", implicit tc => MonadErrorTests[IO[Int, _], Int].monadError[Int, Int, Int]) + + locally { + checkAllAsync( + "GenTemporal[IO[Int, _], Cause[Int]]", + { implicit tc => + import zio.interop.catz.generic.* + GenTemporalTests[IO[Int, _], Cause[Int]].temporal[Int, Int, Int](100.millis) + } + ) + checkAllAsync( + "GenSpawn[IO[Int, _], Cause[Int]]", + { implicit tc => + import zio.interop.catz.generic.* + GenSpawnTests[IO[Int, _], Cause[Int]].spawn[Int, Int, Int] + } + ) + checkAllAsync( + "MonadCancel[IO[In t, _], Cause[Int]]", + { implicit tc => + import zio.interop.catz.generic.* + MonadCancelTests[IO[Int, _], Cause[Int]].monadCancel[Int, Int, Int] + } + ) + } checkAllAsync("MonoidK[IO[Int, _]]", implicit tc => MonoidKTests[IO[Int, _]].monoidK[Int]) checkAllAsync("SemigroupK[IO[Option[Unit], _]]", implicit tc => SemigroupKTests[IO[Option[Unit], _]].semigroupK[Int]) checkAllAsync("SemigroupK[Task]", implicit tc => SemigroupKTests[Task].semigroupK[Int]) @@ -46,9 +68,13 @@ class CatsSpec extends ZioSpecBase { Async[RIO[ZClock, _]] Sync[RIO[ZClock, _]] - GenTemporal[ZIO[ZClock, Int, _], Int] + locally { + import zio.interop.catz.generic.* + + GenTemporal[ZIO[ZClock, Int, _], Cause[Int]] + GenConcurrent[ZIO[String, Int, _], Cause[Int]] + } Temporal[RIO[ZClock, _]] - GenConcurrent[ZIO[String, Int, _], Int] Concurrent[RIO[String, _]] MonadError[RIO[String, _], Throwable] Monad[RIO[String, _]] @@ -66,7 +92,10 @@ class CatsSpec extends ZioSpecBase { def liftRIO(implicit runtime: IORuntime) = LiftIO[RIO[String, _]] def liftZManaged(implicit runtime: IORuntime) = LiftIO[ZManaged[String, Throwable, _]] - def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = GenTemporal[ZIO[Any, Int, _], Int] + def runtimeGenTemporal(implicit runtime: Runtime[ZClock]) = { + import zio.interop.catz.generic.* + GenTemporal[ZIO[Any, Int, _], Cause[Int]] + } def runtimeTemporal(implicit runtime: Runtime[ZClock]) = Temporal[Task] } diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index a5c779d1..7ff91bb7 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -117,6 +117,9 @@ private[zio] trait CatsSpecBase Eq.allEqual implicit val eqForCauseOfNothing: Eq[Cause[Nothing]] = + eqForCauseOf[Nothing] + + implicit def eqForCauseOf[E]: Eq[Cause[E]] = (x, y) => (x.isInterrupted && y.isInterrupted) || x == y implicit def eqForExitOfNothing[A: Eq]: Eq[Exit[Nothing, A]] = { @@ -137,20 +140,37 @@ private[zio] trait CatsSpecBase implicit def eqForURIO[R: Arbitrary: Tag, A: Eq](implicit ticker: Ticker): Eq[URIO[R, A]] = eqForZIO[R, Nothing, A] - implicit def execTask(task: Task[Boolean])(implicit ticker: Ticker): Prop = - ZLayer.succeed(testClock).apply(task).toEffect[CIO] + implicit def execZIO[E](zio: ZIO[Any, E, Boolean])(implicit ticker: Ticker): Prop = + zio + .provideEnvironment(environment) + .mapError { + case t: Throwable => t + case e => FiberFailure(Cause.Fail(e, StackTrace.none)) + } + .toEffect[CIO] implicit def orderForUIOofFiniteDuration(implicit ticker: Ticker): Order[UIO[FiniteDuration]] = Order.by(unsafeRun(_).toEither.toOption) - implicit def orderForRIOofFiniteDuration[R: Arbitrary: Tag](implicit - ticker: Ticker - ): Order[RIO[R, FiniteDuration]] = + implicit def orderForRIOofFiniteDuration[R: Arbitrary: Tag](implicit ticker: Ticker): Order[RIO[R, FiniteDuration]] = (x, y) => Arbitrary .arbitrary[ZEnvironment[R]] .sample - .fold(0)(r => x.orDie.provideEnvironment(r) compare y.orDie.provideEnvironment(r)) + .fold(0)(r => orderForUIOofFiniteDuration.compare(x.orDie.provideEnvironment(r), y.orDie.provideEnvironment(r))) + + implicit def orderForZIOofFiniteDuration[E: Order, R: Arbitrary: Tag](implicit + ticker: Ticker + ): Order[ZIO[R, E, FiniteDuration]] = { + implicit val orderForIOofFiniteDuration: Order[IO[E, FiniteDuration]] = + Order.by(unsafeRun(_) match { + case Exit.Success(value) => Right(value) + case Exit.Failure(cause) => Left(cause.failureOption) + }) + + (x, y) => + Arbitrary.arbitrary[ZEnvironment[R]].sample.fold(0)(r => x.provideEnvironment(r) compare y.provideEnvironment(r)) + } implicit def eqForUManaged[A: Eq](implicit ticker: Ticker): Eq[UManaged[A]] = zManagedEq[Any, Nothing, A] diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala index 3cd65e8a..5f5a27cc 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala @@ -17,6 +17,25 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior implicit def arbitraryURManaged[R: Cogen: Tag, A: Arbitrary]: Arbitrary[URManaged[R, A]] = zManagedArbitrary[R, Nothing, A] + + implicit def arbitraryCause[E](implicit e: Arbitrary[E]): Arbitrary[Cause[E]] = { + lazy val self: Gen[Cause[E]] = + Gen.oneOf( + e.arbitrary.map(Cause.Fail(_, StackTrace.none)), + Arbitrary.arbitrary[Throwable].map(Cause.Die(_, StackTrace.none)), + // Generating interrupt failures causes law failures (`canceled`/`Outcome.Canceled` are ill-defined as of now https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175=) +// Gen.long.flatMap(l1 => Gen.long.map(l2 => Cause.Interrupt(Fiber.Id(l1, l2)))), + Gen.delay(self.map(Cause.stack)), + Gen.delay(self.map(Cause.stackless)), + Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Both(e1, e2)))), + Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Then(e1, e2)))), + Gen.const(Cause.empty) + ) + Arbitrary(self) + } + + implicit def cogenCause[E]: Cogen[Cause[E]] = + Cogen(_.hashCode.toLong) } private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase => diff --git a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala index 2f36a528..455f9b4d 100644 --- a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala @@ -5,48 +5,51 @@ import zio.{ Promise, RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } -private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { +private class ZioAsync[R] + extends ZioTemporal[R, Throwable, Throwable] + with Async[RIO[R, _]] + with ZioMonadErrorExitThrowable[R] { - override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = + override def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = fa.onExecutionContext(ec) - override final val executionContext: F[ExecutionContext] = + override def executionContext: F[ExecutionContext] = ZIO.executor.map(_.asExecutionContext) - override final val unique: F[Unique.Token] = + override def unique: F[Unique.Token] = ZIO.succeed(new Unique.Token) - override final def cont[K, Q](body: Cont[F, K, Q]): F[Q] = + override def cont[K, Q](body: Cont[F, K, Q]): F[Q] = Async.defaultCont(body)(this) - override final def suspend[A](hint: Sync.Type)(thunk: => A): F[A] = + override def suspend[A](hint: Sync.Type)(thunk: => A): F[A] = ZIO.attempt(thunk) - override final def delay[A](thunk: => A): F[A] = + override def delay[A](thunk: => A): F[A] = ZIO.attempt(thunk) - override final def defer[A](thunk: => F[A]): F[A] = + override def defer[A](thunk: => F[A]): F[A] = ZIO.suspend(thunk) - override final def blocking[A](thunk: => A): F[A] = + override def blocking[A](thunk: => A): F[A] = ZIO.attempt(thunk) - override final def interruptible[A](many: Boolean)(thunk: => A): F[A] = + override def interruptible[A](many: Boolean)(thunk: => A): F[A] = ZIO.attempt(thunk) - override final def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = + override def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = Promise.make[Nothing, Unit].flatMap { promise => ZIO.asyncZIO { register => k(either => register(promise.await *> ZIO.fromEither(either))) *> promise.succeed(()) } } - override final def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = + override def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = ZIO.async(register => k(register.compose(fromEither))) - override final def fromFuture[A](fut: F[Future[A]]): F[A] = + override def fromFuture[A](fut: F[Future[A]]): F[A] = fut.flatMap(f => ZIO.fromFuture(_ => f)) - override final def never[A]: F[A] = + override def never[A]: F[A] = ZIO.never } diff --git a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala index ccf36ce7..bccec946 100644 --- a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala @@ -5,39 +5,42 @@ import zio.{ Exit, Promise, RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } -private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _]] { +private class ZioAsync[R] + extends ZioTemporal[R, Throwable, Throwable] + with Async[RIO[R, _]] + with ZioMonadErrorExitThrowable[R] { - override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = + override def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = fa.onExecutionContext(ec) - override final val executionContext: F[ExecutionContext] = + override def executionContext: F[ExecutionContext] = ZIO.executor.map(_.asExecutionContext) - override final val unique: F[Unique.Token] = + override def unique: F[Unique.Token] = ZIO.succeed(new Unique.Token) - override final def cont[K, Q](body: Cont[F, K, Q]): F[Q] = + override def cont[K, Q](body: Cont[F, K, Q]): F[Q] = Async.defaultCont(body)(this) - override final def suspend[A](hint: Sync.Type)(thunk: => A): F[A] = hint match { + override def suspend[A](hint: Sync.Type)(thunk: => A): F[A] = hint match { case Sync.Type.Delay => ZIO.attempt(thunk) case Sync.Type.Blocking => ZIO.attemptBlocking(thunk) case Sync.Type.InterruptibleOnce | Sync.Type.InterruptibleMany => ZIO.attemptBlockingInterrupt(thunk) } - override final def delay[A](thunk: => A): F[A] = + override def delay[A](thunk: => A): F[A] = ZIO.attempt(thunk) - override final def defer[A](thunk: => F[A]): F[A] = + override def defer[A](thunk: => F[A]): F[A] = ZIO.suspend(thunk) - override final def blocking[A](thunk: => A): F[A] = + override def blocking[A](thunk: => A): F[A] = ZIO.attemptBlocking(thunk) - override final def interruptible[A](many: Boolean)(thunk: => A): F[A] = + override def interruptible[A](many: Boolean)(thunk: => A): F[A] = ZIO.attemptBlockingInterrupt(thunk) - override final def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = + override def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = for { cancelerPromise <- Promise.make[Nothing, Option[F[Unit]]] res <- ZIO @@ -50,12 +53,12 @@ private class ZioAsync[R] extends ZioTemporal[R, Throwable] with Async[RIO[R, _] .onInterrupt(cancelerPromise.await.flatMap(ZIO.foreach(_)(identity)).orDie) } yield res - override final def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = + override def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = ZIO.async(register => k(register.compose(fromEither))) - override final def fromFuture[A](fut: F[Future[A]]): F[A] = + override def fromFuture[A](fut: F[Future[A]]): F[A] = fut.flatMap(f => ZIO.fromFuture(_ => f)) - override final def never[A]: F[A] = + override def never[A]: F[A] = ZIO.never } diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala index 396a33e7..268e49d0 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala @@ -53,6 +53,23 @@ object catz extends CatsEffectPlatform { object implicits { implicit val rts: Runtime[Any] = Runtime.default } + + /** + * `import zio.interop.catz.generic._` brings in instances of + * `GenConcurrent` and `GenTemporal`,`MonadCancel` and `MonadError` + * for arbitrary non-Throwable `E` error type. + * + * These instances have somewhat different semantics than the instances + * in `catz` however - they operate on `Cause[E]` errors. Meaning that + * cats `ApplicativeError#handleErrorWith` operation can now recover from + * `ZIO.die` and other non-standard ZIO errors not supported by cats IO. + * + * However, in cases where an instance such as `MonadCancel[F, _]` is + * required by a function, these differences should not normally affect behavior - + * by ignoring the error type, such a function signals that it does not + * inspect the errors, but only uses `bracket` portion of `MonadCancel` for finalization. + */ + object generic extends CatsEffectInstancesCause } abstract class CatsEffectPlatform @@ -85,26 +102,48 @@ abstract class CatsEffectInstances extends CatsZioInstances { implicit final def asyncInstance[R]: Async[RIO[R, _]] = asyncInstance0.asInstanceOf[Async[RIO[R, _]]] - implicit final def temporalInstance[R, E]: GenTemporal[ZIO[R, E, _], E] = - temporalInstance0.asInstanceOf[GenTemporal[ZIO[R, E, _], E]] + implicit final def temporalInstance[R]: GenTemporal[ZIO[R, Throwable, _], Throwable] = + temporalInstance0.asInstanceOf[GenTemporal[ZIO[R, Throwable, _], Throwable]] - implicit final def concurrentInstance[R, E]: GenConcurrent[ZIO[R, E, _], E] = - concurrentInstance0.asInstanceOf[GenConcurrent[ZIO[R, E, _], E]] + implicit final def concurrentInstance[R]: GenConcurrent[ZIO[R, Throwable, _], Throwable] = + concurrentInstance0.asInstanceOf[GenConcurrent[ZIO[R, Throwable, _], Throwable]] implicit final def asyncRuntimeInstance[E](implicit runtime: Runtime[Any]): Async[Task] = new ZioRuntimeAsync - implicit final def temporalRuntimeInstance[E](implicit runtime: Runtime[Any]): GenTemporal[IO[E, _], E] = - new ZioRuntimeTemporal[E] + implicit final def temporalRuntimeInstance(implicit + runtime: Runtime[Any] + ): GenTemporal[IO[Throwable, _], Throwable] = + new ZioRuntimeTemporal[Throwable, Throwable] with ZioMonadErrorExitThrowable[Any] private[this] val asyncInstance0: Async[Task] = new ZioAsync private[this] val temporalInstance0: Temporal[Task] = - new ZioTemporal + new ZioTemporal[Any, Throwable, Throwable] with ZioMonadErrorExitThrowable[Any] private[this] val concurrentInstance0: Concurrent[Task] = - new ZioConcurrent[Any, Throwable] + new ZioConcurrent[Any, Throwable, Throwable] with ZioMonadErrorExitThrowable[Any] +} + +sealed abstract class CatsEffectInstancesCause extends CatsZioInstances { + + implicit final def temporalInstanceCause[R, E]: GenTemporal[ZIO[R, E, _], Cause[E]] = + temporalInstance1.asInstanceOf[GenTemporal[ZIO[R, E, _], Cause[E]]] + + implicit final def concurrentInstanceCause[R, E]: GenConcurrent[ZIO[R, E, _], Cause[E]] = + concurrentInstance1.asInstanceOf[GenConcurrent[ZIO[R, E, _], Cause[E]]] + + implicit final def temporalRuntimeInstanceCause[E](implicit + runtime: Runtime[Any] + ): GenTemporal[IO[E, _], Cause[E]] = + new ZioRuntimeTemporal[E, Cause[E]] with ZioMonadErrorExitCause[Any, E] + + private[this] val temporalInstance1: GenTemporal[ZIO[Any, Any, _], Cause[Any]] = + new ZioTemporal[Any, Any, Cause[Any]] with ZioMonadErrorExitCause[Any, Any] + + private[this] val concurrentInstance1: GenConcurrent[ZIO[Any, Any, _], Cause[Any]] = + new ZioConcurrent[Any, Any, Cause[Any]] with ZioMonadErrorExitCause[Any, Any] } abstract class CatsZioInstances extends CatsZioInstances1 { @@ -166,7 +205,7 @@ sealed abstract class CatsZioInstances2 { monadErrorInstance0.asInstanceOf[MonadError[ZIO[R, E, _], E]] private[this] val monadErrorInstance0: MonadError[Task, Throwable] = - new ZioMonadError[Any, Throwable] + new ZioMonadError[Any, Throwable, Throwable] with ZioMonadErrorE[Any, Throwable] } private class ZioDefer[R, E] extends Defer[ZIO[R, E, _]] { @@ -178,19 +217,20 @@ private class ZioDefer[R, E] extends Defer[ZIO[R, E, _]] { } } -private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent[ZIO[R, E, _], E] { +private abstract class ZioConcurrent[R, E, E1] + extends ZioMonadErrorExit[R, E, E1] + with GenConcurrent[ZIO[R, E, _], E1] { - private def toPoll(restore: ZIO.InterruptibilityRestorer) = new Poll[ZIO[R, E, _]] { - override def apply[T](fa: ZIO[R, E, T]): ZIO[R, E, T] = restore(fa)(CoreTracer.newTrace) + private def toFiber[A](fiber: Fiber[E, A]): effect.Fiber[F, E1, A] = new effect.Fiber[F, E1, A] { + override final val cancel: F[Unit] = fiber.interrupt.unit + override final val join: F[Outcome[F, E1, A]] = fiber.await.map(exitToOutcome) } - private def toFiber[A](fiber: Fiber[E, A])(implicit trace: Trace) = new effect.Fiber[F, E, A] { - override final val cancel: F[Unit] = fiber.interrupt.unit - override final val join: F[Outcome[F, E, A]] = fiber.await.map(toOutcome) - } - - private def fiberFailure(error: E) = - FiberFailure(Cause.fail(error)) + private def toThrowableOrFiberFailure(error: E): Throwable = + error match { + case t: Throwable => t + case _ => FiberFailure(Cause.fail(error)) + } override def ref[A](a: A): F[effect.Ref[F, A]] = { implicit def trace: Trace = CoreTracer.newTrace @@ -204,7 +244,7 @@ private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent Promise.make[E, A].map(new ZioDeferred(_)) } - override final def start[A](fa: F[A]): F[effect.Fiber[F, E, A]] = { + override final def start[A](fa: F[A]): F[effect.Fiber[F, E1, A]] = { implicit def trace: Trace = CoreTracer.newTrace fa.interruptible.forkDaemon.map(toFiber) @@ -225,7 +265,7 @@ private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent override final def uncancelable[A](body: Poll[F] => F[A]): F[A] = { implicit def trace: Trace = InteropTracer.newTrace(body) - ZIO.uninterruptibleMask(body.compose(toPoll)) + ZIO.uninterruptibleMask(restore => body(toPoll(restore))) } override final def canceled: F[Unit] = @@ -234,18 +274,21 @@ private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent override final def onCancel[A](fa: F[A], fin: F[Unit]): F[A] = { implicit def trace: Trace = CoreTracer.newTrace - fa.onError(cause => fin.orDieWith(fiberFailure).unless(cause.isFailure)) + fa.onError(cause => fin.orDieWith(toThrowableOrFiberFailure).unless(cause.isFailure)) } override final def memoize[A](fa: F[A]): F[F[A]] = fa.memoize(CoreTracer.newTrace) - override final def racePair[A, B](fa: F[A], fb: F[B]) = { + override final def racePair[A, B](fa: F[A], fb: F[B]): ZIO[R, Nothing, Either[ + (Outcome[F, E1, A], effect.Fiber[F, E1, B]), + (effect.Fiber[F, E1, A], Outcome[F, E1, B]) + ]] = { implicit def trace: Trace = CoreTracer.newTrace (fa.interruptible raceWith fb.interruptible)( - (exit, fiber) => ZIO.succeedNow(Left((toOutcome(exit), toFiber(fiber)))), - (exit, fiber) => ZIO.succeedNow(Right((toFiber(fiber), toOutcome(exit)))) + (exit, fiber) => ZIO.succeedNow(Left((exitToOutcome(exit), toFiber(fiber)))), + (exit, fiber) => ZIO.succeedNow(Right((toFiber(fiber), exitToOutcome(exit)))) ) } @@ -258,16 +301,16 @@ private class ZioConcurrent[R, E] extends ZioMonadError[R, E] with GenConcurrent override final def guarantee[A](fa: F[A], fin: F[Unit]): F[A] = { implicit def trace: Trace = CoreTracer.newTrace - fa.ensuring(fin.orDieWith(fiberFailure)) + fa.ensuring(fin.orDieWith(toThrowableOrFiberFailure)) } override final def bracket[A, B](acquire: F[A])(use: A => F[B])(release: A => F[Unit]): F[B] = { implicit def trace: Trace = InteropTracer.newTrace(use) - ZIO.acquireReleaseWith(acquire)(release.andThen(_.orDieWith(fiberFailure)))(use) + ZIO.acquireReleaseWith(acquire)(release.andThen(_.orDieWith(toThrowableOrFiberFailure)))(use) } - override val unique: F[Unique.Token] = + override def unique: F[Unique.Token] = ZIO.succeed(new Unique.Token)(CoreTracer.newTrace) } @@ -354,74 +397,70 @@ private final class ZioRef[R, E, A](ref: ZRef[A]) extends effect.Ref[ZIO[R, E, _ ref.get(CoreTracer.newTrace) } -private class ZioTemporal[R, E] extends ZioConcurrent[R, E] with GenTemporal[ZIO[R, E, _], E] { +private abstract class ZioTemporal[R, E, E1] extends ZioConcurrent[R, E, E1] with GenTemporal[ZIO[R, E, _], E1] { - override final def sleep(time: FiniteDuration): F[Unit] = { + override def sleep(time: FiniteDuration): F[Unit] = { implicit def trace: Trace = CoreTracer.newTrace ZIO.sleep(Duration.fromScala(time)) } - override final val monotonic: F[FiniteDuration] = { + override def monotonic: F[FiniteDuration] = { implicit def trace: Trace = CoreTracer.newTrace nanoTime.map(FiniteDuration(_, NANOSECONDS)) } - override final val realTime: F[FiniteDuration] = { + override def realTime: F[FiniteDuration] = { implicit def trace: Trace = CoreTracer.newTrace currentTime(MILLISECONDS).map(FiniteDuration(_, MILLISECONDS)) } } -private class ZioRuntimeTemporal[E](implicit runtime: Runtime[Any]) - extends ZioConcurrent[Any, E] - with GenTemporal[IO[E, _], E] { +private abstract class ZioRuntimeTemporal[E, E1](implicit runtime: Runtime[Any]) extends ZioTemporal[Any, E, E1] { - private[this] val underlying: GenTemporal[ZIO[Any, E, _], E] = new ZioTemporal[Any, E] - private[this] val clock: ZEnvironment[Any] = runtime.environment + private[this] val clock: ZEnvironment[Any] = runtime.environment override final def sleep(time: FiniteDuration): F[Unit] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.sleep(time).provideEnvironment(clock) + super.sleep(time).provideEnvironment(clock) } override final val monotonic: F[FiniteDuration] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.monotonic.provideEnvironment(clock) + super.monotonic.provideEnvironment(clock) } override final val realTime: F[FiniteDuration] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.realTime.provideEnvironment(clock) + super.realTime.provideEnvironment(clock) } } -private class ZioRuntimeAsync(implicit runtime: Runtime[Any]) extends ZioRuntimeTemporal[Throwable] with Async[Task] { +private class ZioRuntimeAsync(implicit runtime: Runtime[Any]) extends ZioAsync[Any] { - private[this] val underlying: Async[RIO[Any, _]] = new ZioAsync[Any] private[this] val environment: ZEnvironment[Any] = runtime.environment override final def evalOn[A](fa: F[A], ec: ExecutionContext): F[A] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.evalOn(fa, ec).provideEnvironment(environment) + super.evalOn(fa, ec).provideEnvironment(environment) } override final val executionContext: F[ExecutionContext] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.executionContext.provideEnvironment(environment) + super.executionContext.provideEnvironment(environment) } override final val unique: F[Unique.Token] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.unique.provideEnvironment(environment) + super.unique.provideEnvironment(environment) } override final def cont[K, Q](body: Cont[F, K, Q]): F[Q] = @@ -431,60 +470,60 @@ private class ZioRuntimeAsync(implicit runtime: Runtime[Any]) extends ZioRuntime val byName: () => A = () => thunk implicit def trace: Trace = InteropTracer.newTrace(byName) - underlying.suspend(hint)(thunk).provideEnvironment(environment) + super.suspend(hint)(thunk).provideEnvironment(environment) } override final def delay[A](thunk: => A): F[A] = { val byName: () => A = () => thunk implicit def trace: Trace = InteropTracer.newTrace(byName) - underlying.delay(thunk).provideEnvironment(environment) + super.delay(thunk).provideEnvironment(environment) } override final def defer[A](thunk: => F[A]): F[A] = { val byName: () => F[A] = () => thunk implicit def trace: Trace = InteropTracer.newTrace(byName) - underlying.defer(thunk).provideEnvironment(environment) + super.defer(thunk).provideEnvironment(environment) } override final def blocking[A](thunk: => A): F[A] = { val byName: () => A = () => thunk implicit def trace: Trace = InteropTracer.newTrace(byName) - underlying.blocking(thunk).provideEnvironment(environment) + super.blocking(thunk).provideEnvironment(environment) } override final def interruptible[A](many: Boolean)(thunk: => A): F[A] = { val byName: () => A = () => thunk implicit def trace: Trace = InteropTracer.newTrace(byName) - underlying.interruptible(many)(thunk).provideEnvironment(environment) + super.interruptible(many)(thunk).provideEnvironment(environment) } override final def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = { implicit def trace: Trace = InteropTracer.newTrace(k) - underlying.async(k).provideEnvironment(environment) + super.async(k).provideEnvironment(environment) } override final def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = { implicit def trace: Trace = InteropTracer.newTrace(k) - underlying.async_(k).provideEnvironment(environment) + super.async_(k).provideEnvironment(environment) } override final def fromFuture[A](fut: F[Future[A]]): F[A] = { implicit def trace: Trace = CoreTracer.newTrace - underlying.fromFuture(fut).provideEnvironment(environment) + super.fromFuture(fut).provideEnvironment(environment) } override final def never[A]: F[A] = ZIO.never(CoreTracer.newTrace) } -private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] with StackSafeMonad[ZIO[R, E, _]] { +private abstract class ZioMonadError[R, E, E1] extends MonadError[ZIO[R, E, _], E1] with StackSafeMonad[ZIO[R, E, _]] { type F[A] = ZIO[R, E, A] override final def pure[A](a: A): F[A] = @@ -529,6 +568,19 @@ private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] with Stack override final def unit: F[Unit] = ZIO.unit + override final def tailRecM[A, B](a: A)(f: A => F[Either[A, B]]): F[B] = { + def loop(a: A): F[B] = f(a).flatMap { + case Left(a) => loop(a) + case Right(b) => ZIO.succeedNow(b) + } + + ZIO.suspendSucceed(loop(a)) + } + +} + +private trait ZioMonadErrorE[R, E] extends ZioMonadError[R, E, E] { + override final def handleErrorWith[A](fa: F[A])(f: E => F[A]): F[A] = { implicit def trace: Trace = InteropTracer.newTrace(f) @@ -560,6 +612,50 @@ private class ZioMonadError[R, E] extends MonadError[ZIO[R, E, _], E] with Stack } } +private trait ZioMonadErrorCause[R, E] extends ZioMonadError[R, E, Cause[E]] { + + override final def handleErrorWith[A](fa: F[A])(f: Cause[E] => F[A]): F[A] = +// fa.catchAllCause(f) + fa.catchSomeCause { + // pretend that we can't catch inner interrupt to satisfy `uncancelable canceled associates right over flatMap attempt` + // law since we use a poor definition of `canceled=ZIO.interrupt` right now + // https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175= + case c if !c.isInterrupted => f(c) + } + + override final def recoverWith[A](fa: F[A])(pf: PartialFunction[Cause[E], F[A]]): F[A] = +// fa.catchSomeCause(pf) + fa.catchSomeCause(({ case c if !c.isInterrupted => c }: PartialFunction[Cause[E], Cause[E]]).andThen(pf)) + + override final def raiseError[A](e: Cause[E]): F[A] = + ZIO.failCause(e) + + override final def attempt[A](fa: F[A]): F[Either[Cause[E], A]] = +// fa.sandbox.attempt + fa.map(Right(_)).catchSomeCause { + case c if !c.isInterrupted => ZIO.succeedNow(Left(c)) + } + + override final def adaptError[A](fa: F[A])(pf: PartialFunction[Cause[E], Cause[E]]): F[A] = + fa.mapErrorCause(pf.orElse { case error => error }) +} + +private abstract class ZioMonadErrorExit[R, E, E1] extends ZioMonadError[R, E, E1] { + protected def exitToOutcome[A](exit: Exit[E, A])(implicit trace: Trace): Outcome[F, E1, A] +} + +private trait ZioMonadErrorExitThrowable[R] + extends ZioMonadErrorExit[R, Throwable, Throwable] + with ZioMonadErrorE[R, Throwable] { + override protected def exitToOutcome[A](exit: Exit[Throwable, A])(implicit trace: Trace): Outcome[F, Throwable, A] = + toOutcomeThrowable(exit) +} + +private trait ZioMonadErrorExitCause[R, E] extends ZioMonadErrorExit[R, E, Cause[E]] with ZioMonadErrorCause[R, E] { + override protected def exitToOutcome[A](exit: Exit[E, A])(implicit trace: Trace): Outcome[F, Cause[E], A] = + toOutcomeCause(exit) +} + private class ZioSemigroupK[R, E] extends SemigroupK[ZIO[R, E, _]] { type F[A] = ZIO[R, E, A] diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala index 3631dc42..14064b94 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala @@ -67,13 +67,14 @@ final class ZIOResourceSyntax[R, E <: Throwable, A](private val resource: Resour */ def toScopedZIO(implicit trace: Trace): ZIO[R with Scope, E, A] = { type F[T] = ZIO[R, E, T] - val F = MonadCancel[F, E] def go[B](resource: Resource[F, B]): ZIO[R with Scope, E, B] = resource match { case allocate: Resource.Allocate[F, b] => ZIO.acquireReleaseExit { - F.uncancelable(allocate.resource) + ZIO.uninterruptibleMask { restore => + allocate.resource(toPoll(restore)) + } } { case ((_, release), exit) => release(toExitCase(exit)).orDie }.map(_._1) @@ -126,6 +127,7 @@ final class ScopedSyntax(private val self: Resource.type) extends AnyVal { def scoped[F[_]: Async, R, A]( zio: ZIO[Scope with R, Throwable, A] )(implicit runtime: Runtime[R], trace: Trace): Resource[F, A] = + // import _root_.zio.interop.catz.generic.* scopedZIO[R, Throwable, A](zio).mapK(new (ZIO[R, Throwable, _] ~> F) { override def apply[B](zio: ZIO[R, Throwable, B]) = toEffect(zio) }) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index bc0887ab..1b36e73a 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -16,7 +16,7 @@ package zio -import cats.effect.kernel.{ Async, Outcome, Resource } +import cats.effect.kernel.{ Async, Outcome, Poll, Resource } import cats.effect.std.Dispatcher import cats.syntax.all.* @@ -24,18 +24,30 @@ import scala.concurrent.Future package object interop { - @inline private[interop] def toOutcome[R, E, A]( - exit: Exit[E, A] - )(implicit trace: Trace): Outcome[ZIO[R, E, _], E, A] = + @inline private[interop] def toOutcomeCause[R, E, A](exit: Exit[E, A]): Outcome[ZIO[R, E, _], Cause[E], A] = exit match { case Exit.Success(value) => - Outcome.Succeeded(ZIO.succeed(value)) + Outcome.Succeeded(ZIO.succeedNow(value)) + case Exit.Failure(cause) if cause.isInterrupted => + Outcome.Canceled() + case Exit.Failure(cause) => + Outcome.Errored(cause) + } + + @inline private[interop] def toOutcomeThrowable[R, A]( + exit: Exit[Throwable, A] + ): Outcome[ZIO[R, Throwable, _], Throwable, A] = + exit match { + case Exit.Success(value) => + Outcome.Succeeded(ZIO.succeedNow(value)) case Exit.Failure(cause) if cause.isInterrupted => Outcome.Canceled() case Exit.Failure(cause) => cause.failureOrCause match { case Left(error) => Outcome.Errored(error) - case Right(cause) => Outcome.Succeeded(ZIO.failCause(cause)) + case Right(cause) => + val compositeError = dieCauseToThrowable(cause) + Outcome.Errored(compositeError) } } @@ -54,18 +66,33 @@ package object interop { Resource.ExitCase.Canceled case Exit.Failure(cause) => cause.failureOrCause match { - case Left(error: Throwable) => Resource.ExitCase.Errored(error) - case _ => Resource.ExitCase.Errored(FiberFailure(cause)) + case Left(error: Throwable) => + Resource.ExitCase.Errored(error) + case Left(_) => + Resource.ExitCase.Errored(FiberFailure(cause)) + case Right(cause) => + val compositeError = dieCauseToThrowable(cause) + Resource.ExitCase.Errored(compositeError) } } + private[interop] def toPoll[R, E](restore: ZIO.InterruptibilityRestorer): Poll[ZIO[R, E, _]] = + new Poll[ZIO[R, E, _]] { + override def apply[T](fa: ZIO[R, E, T]): ZIO[R, E, T] = restore(fa) + } + + @inline private def dieCauseToThrowable(cause: Cause[Nothing]): Throwable = + cause.defects match { + case one :: Nil => one + case _ => FiberFailure(cause) + } + @inline def fromEffect[F[_], A](fa: F[A])(implicit F: Dispatcher[F], trace: Trace): Task[A] = ZIO .succeed(F.unsafeToFutureCancelable(fa)) .flatMap { case (future, cancel) => - ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie).interruptible + ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie) } - .uninterruptible @inline def toEffect[F[_], R, A]( rio: RIO[R, A] diff --git a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala index cc1985bd..8c33bfd4 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala @@ -20,12 +20,15 @@ trait FS2StreamSyntax { class ZStreamSyntax[R, E, A](private val stream: ZStream[R, E, A]) extends AnyVal { /** Convert a [[zio.stream.ZStream]] into an [[fs2.Stream]]. */ - def toFs2Stream(implicit trace: Trace): fs2.Stream[ZIO[R, E, _], A] = + def toFs2Stream(implicit trace: Trace): fs2.Stream[ZIO[R, E, _], A] = { + import zio.interop.catz.generic.* + fs2.Stream.resource(Resource.scopedZIO[R, E, ZIO[R, Option[E], Chunk[A]]](stream.toPull)).flatMap { pull => fs2.Stream.repeatEval(pull.unsome).unNoneTerminate.flatMap { chunk => fs2.Stream.chunk(fs2.Chunk.indexedSeq(chunk)) } } + } } final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { From 770d945d9735243820ce28a90c6ec29ce64d4271 Mon Sep 17 00:00:00 2001 From: johnspade Date: Tue, 2 Aug 2022 00:10:51 +0300 Subject: [PATCH 3/9] Fix #503 implement MonadCancel#canceled by sending an external interrupt to current fiber via Fiber.unsafeCurrentFiber #544 https://github.com/zio/interop-cats/pull/544 --- .../scala/zio/interop/CatsInteropSpec.scala | 95 +++++++- .../zio/interop/CatsZManagedSyntaxSpec.scala | 86 +++++++- .../test/scala/zio/interop/CatsSpecBase.scala | 9 +- .../scala/zio/interop/GenIOInteropCats.scala | 60 ++++-- .../test/scala/zio/interop/ZioSpecBase.scala | 42 +++- .../src/main/scala/zio/interop/ZioAsync.scala | 18 +- .../src/main/scala/zio/interop/ZioAsync.scala | 26 +-- .../src/main/scala/zio/interop/cats.scala | 164 ++++++++++---- .../main/scala/zio/interop/catszmanaged.scala | 5 +- .../src/main/scala/zio/interop/package.scala | 202 +++++++++++++----- .../zio/test/interop/CatsRunnableSpec.scala | 13 +- 11 files changed, 563 insertions(+), 157 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index 6031c9d0..44567b29 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -1,10 +1,10 @@ package zio.interop -import cats.effect.{ Async, IO as CIO, LiftIO } +import cats.effect.{ Async, IO as CIO, LiftIO, Outcome } import cats.effect.kernel.{ Concurrent, Resource } import zio.interop.catz.* import zio.test.* -import zio.{ Promise, Task, ZIO } +import zio.* object CatsInteropSpec extends CatsRunnableSpec { def spec = suite("Cats interop")( @@ -51,6 +51,97 @@ object CatsInteropSpec extends CatsRunnableSpec { sanityCheckCIO <- fromEffect(test[CIO]) zioResult <- test[Task] } yield zioResult && sanityCheckCIO + }, + test("onCancel is not triggered by ZIO.parTraverse + ZIO.fail https://github.com/zio/zio/issues/6911") { + val F = Concurrent[Task] + + for { + counter <- F.ref("") + _ <- F.guaranteeCase( + F.onError( + F.onCancel( + ZIO.collectAllPar( + List( + ZIO.unit.forever, + counter.update(_ + "A") *> ZIO.fail(new RuntimeException("x")).unit + ) + ), + counter.update(_ + "1") + ) + ) { case _ => counter.update(_ + "B") } + ) { + case Outcome.Errored(_) => counter.update(_ + "C") + case Outcome.Canceled() => counter.update(_ + "2") + case Outcome.Succeeded(_) => counter.update(_ + "3") + }.exit + res <- counter.get + } yield assertTrue(!res.contains("1")) && assertTrue(res == "ABC") + }, + test("onCancel is not triggered by ZIO.parTraverse + ZIO.die https://github.com/zio/zio/issues/6911") { + val F = Concurrent[Task] + + for { + counter <- F.ref("") + _ <- F.guaranteeCase( + F.onError( + F.onCancel( + ZIO.collectAllPar( + List( + ZIO.unit.forever, + counter.update(_ + "A") *> ZIO.die(new RuntimeException("x")).unit + ) + ), + counter.update(_ + "1") + ) + ) { case _ => counter.update(_ + "B") } + ) { + case Outcome.Errored(_) => counter.update(_ + "C") + case Outcome.Canceled() => counter.update(_ + "2") + case Outcome.Succeeded(_) => counter.update(_ + "3") + }.exit + res <- counter.get + } yield assertTrue(!res.contains("1")) && assertTrue(res == "AC") + }, + test("onCancel is not triggered by ZIO.parTraverse + ZIO.interrupt https://github.com/zio/zio/issues/6911") { + val F = Concurrent[Task] + + for { + counter <- F.ref("") + _ <- F.guaranteeCase( + F.onError( + F.onCancel( + ZIO.collectAllPar( + List( + ZIO.unit.forever, + counter.update(_ + "A") *> ZIO.interrupt.unit + ) + ), + counter.update(_ + "1") + ) + ) { case _ => counter.update(_ + "B") } + ) { + case Outcome.Errored(_) => counter.update(_ + "C") + case Outcome.Canceled() => counter.update(_ + "2") + case Outcome.Succeeded(_) => counter.update(_ + "3") + }.exit + res <- counter.get + } yield assertTrue(!res.contains("1")) && assertTrue(res == "AC") + }, + test("F.canceled.toEffect results in CancellationException, not BoxedException") { + val F = Concurrent[Task] + + val exception: Option[Throwable] = + try { + F.canceled.toEffect[cats.effect.IO].unsafeRunSync() + None + } catch { + case t: Throwable => Some(t) + } + + assertTrue( + !exception.get.getMessage.contains("Boxed Exception") && + exception.get.getMessage.contains("The fiber was canceled") + ) } ) } diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala index aef69c4e..b3c62da7 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala @@ -1,6 +1,6 @@ package zio.interop -import cats.effect.kernel.Resource +import cats.effect.kernel.{ Concurrent, Resource } import cats.effect.IO as CIO import zio.* import zio.interop.catz.* @@ -15,13 +15,39 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { def spec = suite("CatsZManagedSyntaxSpec")( suite("toManaged")( - test("calls finalizers correctly when use is interrupted") { + test("calls finalizers correctly when use is externally interrupted") { val effects = new mutable.ListBuffer[Int] def res(x: Int): Resource[CIO, Unit] = Resource.makeCase(CIO.delay(effects += x).void) { case (_, Resource.ExitCase.Canceled) => CIO.delay(effects += x + 1).void - case _ => CIO.unit + case (_, _) => + CIO.unit + } + + val testCase = { + val managed: ZManaged[Any, Throwable, Unit] = res(1).toManaged + Promise.make[Nothing, Unit].flatMap { latch => + managed + .use(_ => latch.succeed(()) *> ZIO.never) + .forkDaemon + .flatMap(latch.await *> _.interrupt) + } + } + + for { + _ <- testCase + effects <- ZIO.succeed(effects.toList) + } yield assert(effects)(equalTo(List(1, 2))) + }, + test("calls finalizers correctly when use is internally interrupted") { + val effects = new mutable.ListBuffer[Int] + def res(x: Int): Resource[CIO, Unit] = + Resource.makeCase(CIO.delay(effects += x).void) { + case (_, Resource.ExitCase.Errored(_)) => + CIO.delay(effects += x + 1).void + case (_, _) => + CIO.unit } val testCase = { @@ -128,7 +154,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { } ), suite("toManagedZIO")( - test("calls finalizers correctly when use is interrupted") { + test("calls finalizers correctly when use is externally interrupted") { val effects = new mutable.ListBuffer[Int] def res(x: Int): Resource[Task, Unit] = Resource.makeCase(ZIO.attempt(effects += x).unit) { @@ -137,6 +163,30 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { case _ => ZIO.unit } + val testCase = { + val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO + Promise.make[Nothing, Unit].flatMap { latch => + managed + .use(_ => latch.succeed(()) *> ZIO.never) + .forkDaemon + .flatMap(latch.await *> _.interrupt) + } + } + + for { + _ <- testCase + effects <- ZIO.succeed(effects.toList) + } yield assert(effects)(equalTo(List(1, 2))) + }, + test("calls finalizers correctly when use is internally interrupted") { + val effects = new mutable.ListBuffer[Int] + def res(x: Int): Resource[Task, Unit] = + Resource.makeCase(ZIO.attempt(effects += x).unit) { + case (_, Resource.ExitCase.Errored(_)) => + ZIO.attempt(effects += x + 1).unit + case _ => ZIO.unit + } + val testCase = { val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO managed.use(_ => ZIO.interrupt.unit) @@ -268,13 +318,13 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { effects <- ZIO.succeed(effects.toList) } yield assert(effects)(equalTo(List(1, 2))) }, - test("calls finalizers when using resource is canceled") { + test("calls finalizers when using resource is internally interrupted") { val effects = new mutable.ListBuffer[Int] def man(x: Int): ZManaged[Any, Throwable, Unit] = ZManaged.acquireReleaseExitWith(ZIO.succeed(effects += x).unit) { - case (_, e) if e.isInterrupted => + case (_, Exit.Failure(c)) if !c.isInterrupted && c.failureOption.nonEmpty => ZIO.succeed(effects += x + 1) - case _ => + case _ => ZIO.unit } @@ -284,6 +334,28 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { effects <- ZIO.succeed(effects.toList) } yield assert(effects)(equalTo(List(1, 2))) }, + test("calls finalizers when using resource is externally interrupted") { // todo + val effects = new mutable.ListBuffer[Int] + def man(x: Int): ZManaged[Any, Throwable, Unit] = + ZManaged.acquireReleaseExitWith(ZIO.succeed(effects += x).unit) { + case (_, e) if e.isInterrupted => + ZIO.succeed(effects += x + 1) + case _ => + ZIO.unit + } + + val exception: Option[Throwable] = + try { + man(1).toResource[Task].use(_ => Concurrent[Task].canceled).toEffect[cats.effect.IO].unsafeRunSync() + None + } catch { + case t: Throwable => Some(t) + } + + assert(effects.toList)(equalTo(List(1, 2))) && assertTrue( + exception.get.getMessage.contains("The fiber was canceled") + ) + }, test("acquisition of Reservation preserves cancellability in new F") { for { startLatch <- Promise.make[Nothing, Unit] diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index 7ff91bb7..e6ec4122 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -94,7 +94,12 @@ private[zio] trait CatsSpecBase FiberRef.currentBlockingExecutor -> ::(fiberId -> blockingExecutor, Nil) ) ) - val runtimeFlags = RuntimeFlags.default + val runtimeFlags = RuntimeFlags( // todo + RuntimeFlag.FiberRoots, + RuntimeFlag.Interruption, + RuntimeFlag.CooperativeYielding, + RuntimeFlag.CurrentFiber + ) Runtime(ZEnvironment.empty, fiberRefs, runtimeFlags) } @@ -113,6 +118,8 @@ private[zio] trait CatsSpecBase implicit val eqForNothing: Eq[Nothing] = Eq.allEqual + // workaround for laws `evalOn local pure` & `executionContext commutativity` + // (ZIO cannot implement them at all due to `.executor.asEC` losing the original executionContext) implicit val eqForExecutionContext: Eq[ExecutionContext] = Eq.allEqual diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala index 4571013a..d7e8a9e5 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala @@ -1,16 +1,22 @@ package zio.interop +import cats.effect.GenConcurrent import org.scalacheck.* import zio.* -import zio.managed.* -/** - * Temporary fork of zio.GenIO that overrides `genParallel` with ZManaged-based code - * instead of `io.zipPar(parIo).map(_._1)` - * because ZIP-PAR IS NON-DETERMINISTIC IN ITS SPAWNED EC TASKS (required for TestContext equality) - */ trait GenIOInteropCats { + // FIXME generating anything but success (even genFail) + // surfaces multiple further unaddressed law failures + def betterGenerators: Boolean = false + + // FIXME cats conversion surfaces failures in the following laws: + // `async left is uncancelable sequenced raiseError` + // `async right is uncancelable sequenced pure` + // `applicativeError onError raise` + // `canceled sequences onCanceled in order` + def catsConversionGenerator: Boolean = false + /** * Given a generator for `A`, produces a generator for `IO[E, A]` using the `IO.point` constructor. */ @@ -27,7 +33,34 @@ trait GenIOInteropCats { */ def genSuccess[E, A: Arbitrary]: Gen[IO[E, A]] = Gen.oneOf(genSyncSuccess[E, A], genAsyncSuccess[E, A]) - def genIO[E, A: Arbitrary]: Gen[IO[E, A]] = genSuccess[E, A] + def genFail[E: Arbitrary, A]: Gen[IO[E, A]] = Arbitrary.arbitrary[E].map(ZIO.fail[E](_)) + + def genDie(implicit arbThrowable: Arbitrary[Throwable]): Gen[UIO[Nothing]] = arbThrowable.arbitrary.map(ZIO.die(_)) + + def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt + + def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = + Arbitrary.arbitrary[A].map(F.canceled.as(_)) + + def genNever: Gen[UIO[Nothing]] = ZIO.never + + def genIO[E: Arbitrary, A: Arbitrary](implicit + arbThrowable: Arbitrary[Throwable], + F: GenConcurrent[IO[E, _], ?] + ): Gen[IO[E, A]] = if (betterGenerators) + Gen.oneOf( + genSuccess[E, A], + genFail[E, A], + genDie, + genInternalInterrupt, + genCancel[E, A], + genNever + ) + else + Gen.oneOf( + genSuccess[E, A], + genNever + ) def genUIO[A: Arbitrary]: Gen[UIO[A]] = Gen.oneOf(genSuccess[Nothing, A], genIdentityTrans(genSuccess[Nothing, A])) @@ -98,17 +131,8 @@ trait GenIOInteropCats { Gen.const(io.flatMap(a => ZIO.succeed(a))) private def genOfRace[E, A](io: IO[E, A]): Gen[IO[E, A]] = - Gen.const(io.raceFirst(ZIO.never.interruptible)) + Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible)) private def genOfParallel[E, A](io: IO[E, A])(gen: Gen[IO[E, A]]): Gen[IO[E, A]] = - gen.map { parIo => - // this should work, but generates more random failures on CI -// io.interruptible.zipPar(parIo.interruptible).map(_._1) - Promise.make[Nothing, Unit].flatMap { p => - ZManaged - .fromZIO(parIo *> p.succeed(())) - .fork - .useDiscard(p.await *> io) - } - } + gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1)) } diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala index 5f5a27cc..43061359 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala @@ -2,6 +2,7 @@ package zio.interop import org.scalacheck.{ Arbitrary, Cogen, Gen } import zio.* +import zio.internal.stacktracer.Tracer import zio.managed.* private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPriority with GenIOInteropCats { @@ -23,8 +24,11 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior Gen.oneOf( e.arbitrary.map(Cause.Fail(_, StackTrace.none)), Arbitrary.arbitrary[Throwable].map(Cause.Die(_, StackTrace.none)), - // Generating interrupt failures causes law failures (`canceled`/`Outcome.Canceled` are ill-defined as of now https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175=) -// Gen.long.flatMap(l1 => Gen.long.map(l2 => Cause.Interrupt(Fiber.Id(l1, l2)))), + Arbitrary + .arbitrary[Int] + .flatMap(l1 => + Arbitrary.arbitrary[Int].map(l2 => Cause.Interrupt(FiberId(l1, l2, Tracer.instance.empty), StackTrace.none)) + ), Gen.delay(self.map(Cause.stack)), Gen.delay(self.map(Cause.stackless)), Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Both(e1, e2)))), @@ -48,17 +52,41 @@ private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase => implicit def arbitraryIO[E: CanFail: Arbitrary: Cogen, A: Arbitrary: Cogen]: Arbitrary[IO[E, A]] = { implicitly[CanFail[E]] - Arbitrary(Gen.oneOf(genIO[E, A], genLikeTrans(genIO[E, A]), genIdentityTrans(genIO[E, A]))) + import zio.interop.catz.generic.concurrentInstanceCause + Arbitrary( + Gen.oneOf( + genIO[E, A], + genLikeTrans(genIO[E, A]), + genIdentityTrans(genIO[E, A]) + ) + ) } implicit def arbitraryZIO[R: Cogen: Tag, E: CanFail: Arbitrary: Cogen, A: Arbitrary: Cogen]: Arbitrary[ZIO[R, E, A]] = Arbitrary(Gen.function1[ZEnvironment[R], IO[E, A]](arbitraryIO[E, A].arbitrary).map(ZIO.environment[R].flatMap)) - implicit def arbitraryRIO[R: Cogen: Tag, A: Arbitrary: Cogen]: Arbitrary[RIO[R, A]] = - arbitraryZIO[R, Throwable, A] + implicit def arbitraryTask[A: Arbitrary: Cogen](implicit ticker: Ticker): Arbitrary[Task[A]] = { + val arbIO = arbitraryIO[Throwable, A] + if (catsConversionGenerator) + Arbitrary(Gen.oneOf(arbIO.arbitrary, genCatsConversionTask[A])) + else + arbIO + } - implicit def arbitraryTask[A: Arbitrary: Cogen]: Arbitrary[Task[A]] = - arbitraryIO[Throwable, A] + def genCatsConversionTask[A: Arbitrary: Cogen](implicit ticker: Ticker): Gen[Task[A]] = + arbitraryIO[A].arbitrary.map(liftIO(_)) + + def liftIO[A](io: cats.effect.IO[A])(implicit ticker: Ticker): zio.Task[A] = + ZIO.asyncInterrupt { k => + val (result, cancel) = io.unsafeToFutureCancelable() + k(ZIO.fromFuture(_ => result).tapError { + case c: scala.concurrent.CancellationException if c.getMessage == "The fiber was canceled" => + zio.interop.catz.concurrentInstance.canceled *> ZIO.interrupt + case _ => + ZIO.unit + }) + Left(ZIO.fromFuture(_ => cancel()).orDie) + } def zManagedArbitrary[R, E, A](implicit zio: Arbitrary[ZIO[R, E, A]]): Arbitrary[ZManaged[R, E, A]] = Arbitrary(zio.arbitrary.map(ZManaged.fromZIO(_))) diff --git a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala index 455f9b4d..15a8123c 100644 --- a/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/js/src/main/scala/zio/interop/ZioAsync.scala @@ -1,7 +1,7 @@ package zio.interop import cats.effect.kernel.{ Async, Cont, Sync, Unique } -import zio.{ Promise, RIO, ZIO } +import zio.{ RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } @@ -38,10 +38,18 @@ private class ZioAsync[R] ZIO.attempt(thunk) override def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = - Promise.make[Nothing, Unit].flatMap { promise => - ZIO.asyncZIO { register => - k(either => register(promise.await *> ZIO.fromEither(either))) *> promise.succeed(()) - } + ZIO.suspendSucceed { + val p = scala.concurrent.Promise[Either[Throwable, A]]() + + def get: F[A] = + ZIO.fromFuture(_ => p.future).flatMap[Any, Throwable, A](ZIO.fromEither(_)) + + ZIO.uninterruptibleMask(restore => + k({ e => p.trySuccess(e); () }).flatMap { + case Some(canceler) => onCancel(restore(get), canceler.orDie) + case None => restore(get) + } + ) } override def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = diff --git a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala index bccec946..e6b4b50a 100644 --- a/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala +++ b/zio-interop-cats/jvm/src/main/scala/zio/interop/ZioAsync.scala @@ -1,7 +1,7 @@ package zio.interop import cats.effect.kernel.{ Async, Cont, Sync, Unique } -import zio.{ Exit, Promise, RIO, ZIO } +import zio.{ RIO, ZIO } import scala.concurrent.{ ExecutionContext, Future } @@ -41,17 +41,19 @@ private class ZioAsync[R] ZIO.attemptBlockingInterrupt(thunk) override def async[A](k: (Either[Throwable, A] => Unit) => F[Option[F[Unit]]]): F[A] = - for { - cancelerPromise <- Promise.make[Nothing, Option[F[Unit]]] - res <- ZIO - .asyncZIO[R, Throwable, A] { resume => - k(exitEither => resume(ZIO.fromEither(exitEither))).onExit { - case Exit.Success(maybeCanceler) => cancelerPromise.succeed(maybeCanceler) - case _: Exit.Failure[?] => cancelerPromise.succeed(None) - } - } - .onInterrupt(cancelerPromise.await.flatMap(ZIO.foreach(_)(identity)).orDie) - } yield res + ZIO.suspendSucceed { + val p = scala.concurrent.Promise[Either[Throwable, A]]() + + def get: F[A] = + ZIO.fromFuture(_ => p.future).flatMap[Any, Throwable, A](ZIO.fromEither(_)) + + ZIO.uninterruptibleMask(restore => + k({ e => p.trySuccess(e); () }).flatMap { + case Some(canceler) => onCancel(restore(get), canceler.orDie) + case None => restore(get) + } + ) + } override def async_[A](k: (Either[Throwable, A] => Unit) => Unit): F[A] = ZIO.async(register => k(register.compose(fromEither))) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala index 268e49d0..3e3ecfe5 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala @@ -221,10 +221,12 @@ private abstract class ZioConcurrent[R, E, E1] extends ZioMonadErrorExit[R, E, E1] with GenConcurrent[ZIO[R, E, _], E1] { - private def toFiber[A](fiber: Fiber[E, A]): effect.Fiber[F, E1, A] = new effect.Fiber[F, E1, A] { - override final val cancel: F[Unit] = fiber.interrupt.unit - override final val join: F[Outcome[F, E1, A]] = fiber.await.map(exitToOutcome) - } + private def toFiber[A](interrupted: zio.Ref[Boolean])(fiber: Fiber[E, A]): effect.Fiber[F, E1, A] = + new effect.Fiber[F, E1, A] { + override final val cancel: F[Unit] = fiber.interrupt.unit + override final val join: F[Outcome[F, E1, A]] = + fiber.await.flatMap[R, E, Outcome[F, E1, A]]((exit: Exit[E, A]) => toOutcomeOtherFiber[A](interrupted)(exit)) + } private def toThrowableOrFiberFailure(error: E): Throwable = error match { @@ -244,11 +246,11 @@ private abstract class ZioConcurrent[R, E, E1] Promise.make[E, A].map(new ZioDeferred(_)) } - override final def start[A](fa: F[A]): F[effect.Fiber[F, E1, A]] = { - implicit def trace: Trace = CoreTracer.newTrace - - fa.interruptible.forkDaemon.map(toFiber) - } + override final def start[A](fa: F[A]): F[effect.Fiber[F, E1, A]] = + for { + interrupted <- zio.Ref.make(true) // fiber could be interrupted before executing a single op + fiber <- signalOnNoExternalInterrupt(fa.interruptible)(interrupted.set(false)).forkDaemon + } yield toFiber(interrupted)(fiber) override def never[A]: F[A] = ZIO.never(CoreTracer.newTrace) @@ -259,7 +261,13 @@ private abstract class ZioConcurrent[R, E, E1] override final def forceR[A, B](fa: F[A])(fb: F[B]): F[B] = { implicit def trace: Trace = CoreTracer.newTrace - fa.foldCauseZIO(cause => if (cause.isInterrupted) ZIO.failCause(cause) else fb, _ => fb) + fa.foldCauseZIO( + cause => + if (cause.isInterrupted) + ZIO.descriptorWith(descriptor => if (descriptor.interrupters.nonEmpty) ZIO.failCause(cause) else fb) + else fb, + _ => fb + ) } override final def uncancelable[A](body: Poll[F] => F[A]): F[A] = { @@ -268,29 +276,62 @@ private abstract class ZioConcurrent[R, E, E1] ZIO.uninterruptibleMask(restore => body(toPoll(restore))) } - override final def canceled: F[Unit] = - ZIO.interrupt(CoreTracer.newTrace) + override final def canceled: F[Unit] = { + def loopUntilInterrupted: UIO[Unit] = + ZIO.descriptorWith(d => if (d.interrupters.isEmpty) ZIO.yieldNow *> loopUntilInterrupted else ZIO.unit) - override final def onCancel[A](fa: F[A], fin: F[Unit]): F[A] = { - implicit def trace: Trace = CoreTracer.newTrace + val maxRetries = 10 + + def getThisFiber(retries: Int, unsafe: Unsafe): ZIO[Any, Nothing, Fiber[Any, Any]] = + ZIO.yieldNow *> // ZIO.yieldNow is necessary to avoid empty result in some cases (unsafeRunToFuture) + ZIO.suspendSucceed { + Fiber.currentFiber()(unsafe) match { + case Some(fiber) => + ZIO.succeedNow(fiber) + case None => + if (retries < maxRetries) + getThisFiber(retries + 1, unsafe) + else + ZIO.succeed( + throw new IllegalStateException( + "Impossible state: current Fiber not found in `zio.Fiber.unsafeCurrentFiber`" + ) + ) + } + } - fa.onError(cause => fin.orDieWith(toThrowableOrFiberFailure).unless(cause.isFailure)) + Unsafe.unsafeCompat { implicit unsafe => + ZIO.suspendSucceed { + for { + thisFiber <- getThisFiber(0, unsafe) + _ <- thisFiber.interruptAs(thisFiber.id).forkDaemon + _ <- loopUntilInterrupted + } yield () + } + } } + override final def onCancel[A](fa: F[A], fin: F[Unit]): F[A] = + guaranteeCase(fa) { case Outcome.Canceled() => fin.orDieWith(toThrowableOrFiberFailure); case _ => ZIO.unit } + override final def memoize[A](fa: F[A]): F[F[A]] = fa.memoize(CoreTracer.newTrace) - override final def racePair[A, B](fa: F[A], fb: F[B]): ZIO[R, Nothing, Either[ - (Outcome[F, E1, A], effect.Fiber[F, E1, B]), - (effect.Fiber[F, E1, A], Outcome[F, E1, B]) - ]] = { - implicit def trace: Trace = CoreTracer.newTrace - - (fa.interruptible raceWith fb.interruptible)( - (exit, fiber) => ZIO.succeedNow(Left((exitToOutcome(exit), toFiber(fiber)))), - (exit, fiber) => ZIO.succeedNow(Right((toFiber(fiber), exitToOutcome(exit)))) - ) - } + override final def racePair[A, B]( + fa: F[A], + fb: F[B] + ): ZIO[R, Nothing, Either[(Outcome[F, E1, A], effect.Fiber[F, E1, B]), (effect.Fiber[F, E1, A], Outcome[F, E1, B])]] = + for { + interruptedA <- zio.Ref.make(true) + interruptedB <- zio.Ref.make(true) + res <- (signalOnNoExternalInterrupt(fa.interruptible)(interruptedA.set(false)) raceWith + signalOnNoExternalInterrupt(fb.interruptible)(interruptedB.set(false)))( + (exit, fiber) => + toOutcomeOtherFiber(interruptedA)(exit).map(outcome => Left((outcome, toFiber(interruptedB)(fiber)))), + (exit, fiber) => + toOutcomeOtherFiber(interruptedB)(exit).map(outcome => Right((toFiber(interruptedA)(fiber), outcome))) + ) + } yield res override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] = { implicit def trace: Trace = CoreTracer.newTrace @@ -304,12 +345,47 @@ private abstract class ZioConcurrent[R, E, E1] fa.ensuring(fin.orDieWith(toThrowableOrFiberFailure)) } + override final def guaranteeCase[A](fa: ZIO[R, E, A])( + fin: Outcome[ZIO[R, E, _], E1, A] => ZIO[R, E, Unit] + ): ZIO[R, E, A] = + fa.onExit(exit => toOutcomeThisFiber(exit).flatMap(fin).orDieWith(toThrowableOrFiberFailure)) + override final def bracket[A, B](acquire: F[A])(use: A => F[B])(release: A => F[Unit]): F[B] = { implicit def trace: Trace = InteropTracer.newTrace(use) ZIO.acquireReleaseWith(acquire)(release.andThen(_.orDieWith(toThrowableOrFiberFailure)))(use) } + override final def bracketCase[A, B](acquire: ZIO[R, E, A])(use: A => ZIO[R, E, B])( + release: (A, Outcome[ZIO[R, E, _], E1, B]) => ZIO[R, E, Unit] + ): ZIO[R, E, B] = { + def handleRelease(a: A, exit: Exit[E, B]): URIO[R, Any] = + toOutcomeThisFiber(exit).flatMap(release(a, _)).orDieWith(toThrowableOrFiberFailure) + + ZIO.acquireReleaseExitWith(acquire)(handleRelease)(use) + } + + override final def bracketFull[A, B](acquire: Poll[ZIO[R, E, _]] => ZIO[R, E, A])(use: A => ZIO[R, E, B])( + release: (A, Outcome[ZIO[R, E, _], E1, B]) => ZIO[R, E, Unit] + ): ZIO[R, E, B] = + ZIO.uninterruptibleMask[R, E, B] { restore => + acquire(toPoll(restore)).flatMap { a => + ZIO + .suspendSucceed(restore(use(a))) + .exit + .flatMap { e => + ZIO + .suspendSucceed( + toOutcomeThisFiber(e).flatMap(release(a, _)) + ) + .foldCauseZIO( + cause2 => ZIO.failCause(e.foldExit(_ ++ cause2, _ => cause2)), + _ => ZIO.done(e) + ) + } + } + } + override def unique: F[Unique.Token] = ZIO.succeed(new Unique.Token)(CoreTracer.newTrace) } @@ -576,7 +652,6 @@ private abstract class ZioMonadError[R, E, E1] extends MonadError[ZIO[R, E, _], ZIO.suspendSucceed(loop(a)) } - } private trait ZioMonadErrorE[R, E] extends ZioMonadError[R, E, E] { @@ -615,45 +690,44 @@ private trait ZioMonadErrorE[R, E] extends ZioMonadError[R, E, E] { private trait ZioMonadErrorCause[R, E] extends ZioMonadError[R, E, Cause[E]] { override final def handleErrorWith[A](fa: F[A])(f: Cause[E] => F[A]): F[A] = -// fa.catchAllCause(f) - fa.catchSomeCause { - // pretend that we can't catch inner interrupt to satisfy `uncancelable canceled associates right over flatMap attempt` - // law since we use a poor definition of `canceled=ZIO.interrupt` right now - // https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175= - case c if !c.isInterrupted => f(c) - } + fa.catchAllCause(f) override final def recoverWith[A](fa: F[A])(pf: PartialFunction[Cause[E], F[A]]): F[A] = -// fa.catchSomeCause(pf) - fa.catchSomeCause(({ case c if !c.isInterrupted => c }: PartialFunction[Cause[E], Cause[E]]).andThen(pf)) + fa.catchSomeCause(pf) override final def raiseError[A](e: Cause[E]): F[A] = ZIO.failCause(e) override final def attempt[A](fa: F[A]): F[Either[Cause[E], A]] = -// fa.sandbox.attempt - fa.map(Right(_)).catchSomeCause { - case c if !c.isInterrupted => ZIO.succeedNow(Left(c)) - } + fa.sandbox.either override final def adaptError[A](fa: F[A])(pf: PartialFunction[Cause[E], Cause[E]]): F[A] = fa.mapErrorCause(pf.orElse { case error => error }) } private abstract class ZioMonadErrorExit[R, E, E1] extends ZioMonadError[R, E, E1] { - protected def exitToOutcome[A](exit: Exit[E, A])(implicit trace: Trace): Outcome[F, E1, A] + protected def toOutcomeThisFiber[A](exit: Exit[E, A]): UIO[Outcome[F, E1, A]] + protected def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])(exit: Exit[E, A]): UIO[Outcome[F, E1, A]] } private trait ZioMonadErrorExitThrowable[R] extends ZioMonadErrorExit[R, Throwable, Throwable] with ZioMonadErrorE[R, Throwable] { - override protected def exitToOutcome[A](exit: Exit[Throwable, A])(implicit trace: Trace): Outcome[F, Throwable, A] = - toOutcomeThrowable(exit) + override final protected def toOutcomeThisFiber[A](exit: Exit[Throwable, A]): UIO[Outcome[F, Throwable, A]] = + toOutcomeThrowableThisFiber(exit) + protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])( + exit: Exit[Throwable, A] + ): UIO[Outcome[F, Throwable, A]] = + interruptedHandle.get.map(toOutcomeThrowableOtherFiber(_)(ZIO.succeedNow, exit)) } private trait ZioMonadErrorExitCause[R, E] extends ZioMonadErrorExit[R, E, Cause[E]] with ZioMonadErrorCause[R, E] { - override protected def exitToOutcome[A](exit: Exit[E, A])(implicit trace: Trace): Outcome[F, Cause[E], A] = - toOutcomeCause(exit) + override protected def toOutcomeThisFiber[A](exit: Exit[E, A]): UIO[Outcome[F, Cause[E], A]] = + toOutcomeCauseThisFiber(exit) + protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])( + exit: Exit[E, A] + ): UIO[Outcome[F, Cause[E], A]] = + interruptedHandle.get.map(toOutcomeCauseOtherFiber(_)(ZIO.succeedNow, exit)) } private class ZioSemigroupK[R, E] extends SemigroupK[ZIO[R, E, _]] { diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala index 14064b94..fef9a100 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/catszmanaged.scala @@ -75,9 +75,7 @@ final class ZIOResourceSyntax[R, E <: Throwable, A](private val resource: Resour ZIO.uninterruptibleMask { restore => allocate.resource(toPoll(restore)) } - } { case ((_, release), exit) => - release(toExitCase(exit)).orDie - }.map(_._1) + } { case ((_, release), exit) => toExitCaseThisFiber(exit).flatMap(t => release(t)).orDie }.map(_._1) case bind: Resource.Bind[F, a, B] => go(bind.source).flatMap(a => go(bind.fs(a))) @@ -127,7 +125,6 @@ final class ScopedSyntax(private val self: Resource.type) extends AnyVal { def scoped[F[_]: Async, R, A]( zio: ZIO[Scope with R, Throwable, A] )(implicit runtime: Runtime[R], trace: Trace): Resource[F, A] = - // import _root_.zio.interop.catz.generic.* scopedZIO[R, Throwable, A](zio).mapK(new (ZIO[R, Throwable, _] ~> F) { override def apply[B](zio: ZIO[R, Throwable, B]) = toEffect(zio) }) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 1b36e73a..03d3bad5 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -20,37 +20,157 @@ import cats.effect.kernel.{ Async, Outcome, Poll, Resource } import cats.effect.std.Dispatcher import cats.syntax.all.* -import scala.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean package object interop { - @inline private[interop] def toOutcomeCause[R, E, A](exit: Exit[E, A]): Outcome[ZIO[R, E, _], Cause[E], A] = + @inline def fromEffect[F[_], A](fa: F[A])(implicit F: Dispatcher[F]): Task[A] = + ZIO + .succeed(F.unsafeToFutureCancelable(fa)) + .flatMap { case (future, cancel) => + ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie) + } + + @inline def toEffect[F[_], R, A](rio: RIO[R, A])(implicit R: Runtime[R], F: Async[F], trace: Trace): F[A] = + F.defer { + val interrupted = new AtomicBoolean(true) + F.async[Exit[Throwable, A]] { cb => + Unsafe.unsafeCompat { implicit unsafe => + val fiber = R.unsafe.fork { + signalOnNoExternalInterrupt { + rio + }(ZIO.succeed(interrupted.set(false))) + } + fiber.unsafe + .addObserver(exit => cb(Right(exit))) + val cancelerEffect = F.delay { + val _ = fiber.interrupt + } + F.pure(Some(cancelerEffect)) + } + + }.flatMap { exit => + toOutcomeThrowableOtherFiber(interrupted.get())(F.pure(_: A), exit) match { + case Outcome.Succeeded(fa) => + fa + case Outcome.Errored(e) => + F.raiseError(e) + case Outcome.Canceled() => + F.canceled.flatMap(_ => F.raiseError(exit.asInstanceOf[Exit.Failure[Throwable]].cause.squash)) + } + } + } + + implicit class ToEffectSyntax[R, A](private val rio: RIO[R, A]) extends AnyVal { + @inline def toEffect[F[_]: Async](implicit R: Runtime[R], trace: Trace): F[A] = interop.toEffect(rio) + } + + @inline private[interop] def toOutcomeCauseOtherFiber[F[_], E, A]( + actuallyInterrupted: Boolean + )(pure: A => F[A], exit: Exit[E, A]): Outcome[F, Cause[E], A] = exit match { - case Exit.Success(value) => - Outcome.Succeeded(ZIO.succeedNow(value)) - case Exit.Failure(cause) if cause.isInterrupted => + case Exit.Success(value) => + Outcome.Succeeded(pure(value)) + case Exit.Failure(cause) if cause.isInterrupted && actuallyInterrupted => Outcome.Canceled() - case Exit.Failure(cause) => + case Exit.Failure(cause) => Outcome.Errored(cause) } - @inline private[interop] def toOutcomeThrowable[R, A]( - exit: Exit[Throwable, A] - ): Outcome[ZIO[R, Throwable, _], Throwable, A] = + @inline private[interop] def toOutcomeThrowableOtherFiber[F[_], A]( + actuallyInterrupted: Boolean + )(pure: A => F[A], exit: Exit[Throwable, A]): Outcome[F, Throwable, A] = exit match { - case Exit.Success(value) => - Outcome.Succeeded(ZIO.succeedNow(value)) - case Exit.Failure(cause) if cause.isInterrupted => + case Exit.Success(value) => + Outcome.Succeeded(pure(value)) + case Exit.Failure(cause) if cause.isInterrupted && actuallyInterrupted => Outcome.Canceled() - case Exit.Failure(cause) => + case Exit.Failure(cause) => cause.failureOrCause match { - case Left(error) => Outcome.Errored(error) + case Left(error) => + Outcome.Errored(error) case Right(cause) => val compositeError = dieCauseToThrowable(cause) Outcome.Errored(compositeError) } } + @inline private[interop] def toOutcomeCauseThisFiber[R, E, A]( + exit: Exit[E, A] + ): UIO[Outcome[ZIO[R, E, _], Cause[E], A]] = + exit match { + case Exit.Success(value) => + ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) + case Exit.Failure(cause) => + if (cause.isInterrupted) + ZIO.descriptorWith { descriptor => + ZIO.succeedNow( + if (descriptor.interrupters.nonEmpty) + Outcome.Canceled() + else + Outcome.Errored(cause) + ) + } + else ZIO.succeedNow(Outcome.Errored(cause)) + } + + private[interop] def toOutcomeThrowableThisFiber[R, A]( + exit: Exit[Throwable, A] + ): UIO[Outcome[ZIO[R, Throwable, _], Throwable, A]] = + exit match { + case Exit.Success(value) => + ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) + case Exit.Failure(cause) => + def outcomeErrored: Outcome[ZIO[R, Throwable, _], Throwable, A] = + cause.failureOrCause match { + case Left(error) => + Outcome.Errored(error) + case Right(cause) => + val compositeError = dieCauseToThrowable(cause) + Outcome.Errored(compositeError) + } + + if (cause.isInterrupted) + ZIO.descriptorWith { descriptor => + ZIO.succeedNow( + if (descriptor.interrupters.nonEmpty) + Outcome.Canceled() + else + outcomeErrored + ) + } + else ZIO.succeedNow(outcomeErrored) + } + + private[interop] def toExitCaseThisFiber(exit: Exit[Any, Any]): UIO[Resource.ExitCase] = + exit match { + case Exit.Success(_) => + ZIO.succeedNow(Resource.ExitCase.Succeeded) + case Exit.Failure(cause) => + def exitCaseErrored: Resource.ExitCase.Errored = + cause.failureOrCause match { + case Left(error: Throwable) => + Resource.ExitCase.Errored(error) + case Left(_) => + Resource.ExitCase.Errored(FiberFailure(cause)) + case Right(cause) => + val compositeError = dieCauseToThrowable(cause) + Resource.ExitCase.Errored(compositeError) + } + + if (cause.isInterrupted) + ZIO.descriptorWith { descriptor => + ZIO.succeedNow( + if (descriptor.interrupters.nonEmpty) + Resource.ExitCase.Canceled + else + exitCaseErrored + ) + } + else + ZIO.succeedNow(exitCaseErrored) + } + @inline private[interop] def toExit(exitCase: Resource.ExitCase): Exit[Throwable, Unit] = exitCase match { case Resource.ExitCase.Succeeded => Exit.unit @@ -58,54 +178,28 @@ package object interop { case Resource.ExitCase.Errored(error) => Exit.fail(error) } - @inline private[interop] def toExitCase(exit: Exit[Any, Any]): Resource.ExitCase = - exit match { - case Exit.Success(_) => - Resource.ExitCase.Succeeded - case Exit.Failure(cause) if cause.isInterrupted => - Resource.ExitCase.Canceled - case Exit.Failure(cause) => - cause.failureOrCause match { - case Left(error: Throwable) => - Resource.ExitCase.Errored(error) - case Left(_) => - Resource.ExitCase.Errored(FiberFailure(cause)) - case Right(cause) => - val compositeError = dieCauseToThrowable(cause) - Resource.ExitCase.Errored(compositeError) - } - } - - private[interop] def toPoll[R, E](restore: ZIO.InterruptibilityRestorer): Poll[ZIO[R, E, _]] = + @inline private[interop] def toPoll[R, E](restore: ZIO.InterruptibilityRestorer): Poll[ZIO[R, E, _]] = new Poll[ZIO[R, E, _]] { override def apply[T](fa: ZIO[R, E, T]): ZIO[R, E, T] = restore(fa) } + @inline private[interop] def signalOnNoExternalInterrupt[R, E, A]( + f: ZIO[R, E, A] + )(notInterrupted: UIO[Unit]): ZIO[R, E, A] = + f.onExit { + case Exit.Success(_) => ZIO.unit + case Exit.Failure(_) => + // we don't check if cause is interrupted + // because we can get an invalid state Cause.empty + // due to this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + // if the last error was an uninterruptible typed error + ZIO.descriptorWith(d => if (d.interrupters.isEmpty) notInterrupted else ZIO.unit) + } + @inline private def dieCauseToThrowable(cause: Cause[Nothing]): Throwable = cause.defects match { case one :: Nil => one case _ => FiberFailure(cause) } - @inline def fromEffect[F[_], A](fa: F[A])(implicit F: Dispatcher[F], trace: Trace): Task[A] = - ZIO - .succeed(F.unsafeToFutureCancelable(fa)) - .flatMap { case (future, cancel) => - ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie) - } - - @inline def toEffect[F[_], R, A]( - rio: RIO[R, A] - )(implicit R: Runtime[R], F: Async[F], trace: Trace): F[A] = - F.uncancelable { poll => - Unsafe.unsafeCompat { implicit u => - F.delay(R.unsafe.runToFuture(rio)).flatMap { future => - poll(F.onCancel(F.fromFuture(F.pure[Future[A]](future)), F.fromFuture(F.delay(future.cancel())).void)) - } - } - } - - implicit class ToEffectSyntax[R, A](private val rio: RIO[R, A]) extends AnyVal { - @inline def toEffect[F[_]: Async](implicit R: Runtime[R], trace: Trace): F[A] = interop.toEffect(rio) - } } diff --git a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala index 1cf1de25..c2de9edf 100644 --- a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala +++ b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala @@ -12,8 +12,17 @@ abstract class CatsRunnableSpec extends ZIOSpecDefault { private[this] var openDispatcher: Dispatcher[CIO] = _ private[this] var closeDispatcher: CIO[Unit] = _ - implicit val zioRuntime: Runtime[Any] = - Runtime.default + implicit def zioRuntime: Runtime[Any] = // todo + Runtime( + ZEnvironment.empty, + FiberRefs.empty, + RuntimeFlags( + RuntimeFlag.FiberRoots, + RuntimeFlag.Interruption, + RuntimeFlag.CooperativeYielding, + RuntimeFlag.CurrentFiber + ) + ) implicit val cioRuntime: IORuntime = Scheduler.createDefaultScheduler() match { From b36cbe4f64bd15ea06ab36c3d9c7042d64e9b82d Mon Sep 17 00:00:00 2001 From: johnspade Date: Tue, 2 Aug 2022 00:13:37 +0300 Subject: [PATCH 4/9] Outcome conversion and test fixes #549 https://github.com/zio/interop-cats/pull/549 --- .../scala/zio/interop/CatsInteropSpec.scala | 55 +++++++ .../test/scala/zio/interop/CatsSpecBase.scala | 66 ++++---- .../scala/zio/interop/GenIOInteropCats.scala | 62 ++++++-- .../test/scala/zio/interop/ZioSpecBase.scala | 14 +- .../src/main/scala/zio/interop/cats.scala | 12 +- .../src/main/scala/zio/interop/package.scala | 143 +++++++++--------- 6 files changed, 228 insertions(+), 124 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index 44567b29..39078fc3 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -127,6 +127,61 @@ object CatsInteropSpec extends CatsRunnableSpec { res <- counter.get } yield assertTrue(!res.contains("1")) && assertTrue(res == "AC") }, + test( + "onCancel is triggered when a fiber executing ZIO.parTraverse + ZIO.fail is interrupted and the inner typed" + + " error is lost in final Cause (Fail & Interrupt nodes cannot both exist in Cause after external interruption)" + ) { + val F = Concurrent[Task] + + for { + latch1 <- F.deferred[Unit] + latch2 <- F.deferred[Unit] + latch3 <- F.deferred[Unit] + counter <- F.ref("") + cause <- F.ref(Option.empty[Cause[Throwable]]) +// outerScope <- ZIO.forkScope + outerScope <- ZIO.scope + fiber <- F.guaranteeCase( + F.onError( + F.onCancel( + ZIO + .collectAllPar( + List( + F.onCancel( + ZIO.never, + latch2.complete(()).unit + ), + (latch1.complete(()) *> latch3.get).uninterruptible, + counter.update(_ + "A") *> + latch1.get *> + ZIO.fail(new RuntimeException("The_Error")).unit + ) + ) +// .overrideForkScope(outerScope) + .forkScoped + .provideSomeLayer(ZLayer.succeed(outerScope)) // todo how to replace overrideForkScope? + .onExit { + case Exit.Success(_) => ZIO.unit + case Exit.Failure(c) => cause.set(Some(c)).orDie + }, + counter.update(_ + "B") + ) + ) { case _ => counter.update(_ + "1") } + ) { + case Outcome.Errored(_) => counter.update(_ + "2") + case Outcome.Canceled() => counter.update(_ + "C") + case Outcome.Succeeded(_) => counter.update(_ + "3") + }.fork + _ <- latch2.get + _ <- fiber.interrupt + _ <- latch3.complete(()) + res <- counter.get + cause <- cause.get + } yield assertTrue(!res.contains("1")) && + assertTrue(res == "ABC") && + assertTrue(cause.isDefined) && + assertTrue(!cause.get.prettyPrint.contains("The_Error")) + }, test("F.canceled.toEffect results in CancellationException, not BoxedException") { val F = Concurrent[Task] diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index e6ec4122..64601c9e 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -4,7 +4,7 @@ import cats.effect.testkit.TestInstances import cats.effect.kernel.Outcome import cats.effect.IO as CIO import cats.syntax.all.* -import cats.{ Eq, Order } +import cats.{ Eq, Id, Order } import org.scalacheck.{ Arbitrary, Cogen, Gen, Prop } import org.scalatest.funsuite.AnyFunSuite import org.scalatest.prop.Configuration @@ -69,15 +69,18 @@ private[zio] trait CatsSpecBase ??? } - def unsafeRun[E, A](io: IO[E, A])(implicit ticker: Ticker): Exit[E, Option[A]] = + def unsafeRun[E, A](io: IO[E, A])(implicit ticker: Ticker): (Exit[E, Option[A]], Boolean) = try { var exit: Exit[E, Option[A]] = Exit.succeed(Option.empty[A]) + var interrupted: Boolean = true Unsafe.unsafeCompat { implicit u => - val fiber = runtime.unsafe.fork[E, Option[A]](io.asSome) + val fiber = runtime.unsafe.fork[E, Option[A]](signalOnNoExternalInterrupt(io)(ZIO.succeed { + interrupted = false + }).asSome) fiber.unsafe.addObserver(exit = _) } ticker.ctx.tickAll(FiniteDuration(1, TimeUnit.SECONDS)) - exit + (exit, interrupted) } catch { case error: Throwable => error.printStackTrace() @@ -124,22 +127,15 @@ private[zio] trait CatsSpecBase Eq.allEqual implicit val eqForCauseOfNothing: Eq[Cause[Nothing]] = - eqForCauseOf[Nothing] - - implicit def eqForCauseOf[E]: Eq[Cause[E]] = - (x, y) => (x.isInterrupted && y.isInterrupted) || x == y - - implicit def eqForExitOfNothing[A: Eq]: Eq[Exit[Nothing, A]] = { - case (Exit.Success(x), Exit.Success(y)) => x eqv y - case (Exit.Failure(x), Exit.Failure(y)) => x eqv y - case _ => false - } + (x, y) => (x.isInterrupted && y.isInterrupted && x.failureOption.isEmpty && y.failureOption.isEmpty) || x == y implicit def eqForUIO[A: Eq](implicit ticker: Ticker): Eq[UIO[A]] = { (uio1, uio2) => - val exit1 = unsafeRun(uio1) - val exit2 = unsafeRun(uio2) - (exit1 eqv exit2) || { - println(s"$exit1 was not equal to $exit2") + val (exit1, i1) = unsafeRun(uio1) + val (exit2, i2) = unsafeRun(uio2) + val out1 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i1)(identity, exit1) + val out2 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i2)(identity, exit2) + (out1 eqv out2) || { + println(s"$out1 was not equal to $out2") false } } @@ -157,7 +153,7 @@ private[zio] trait CatsSpecBase .toEffect[CIO] implicit def orderForUIOofFiniteDuration(implicit ticker: Ticker): Order[UIO[FiniteDuration]] = - Order.by(unsafeRun(_).toEither.toOption) + Order.by(unsafeRun(_)._1.toEither.toOption) implicit def orderForRIOofFiniteDuration[R: Arbitrary: Tag](implicit ticker: Ticker): Order[RIO[R, FiniteDuration]] = (x, y) => @@ -170,7 +166,7 @@ private[zio] trait CatsSpecBase ticker: Ticker ): Order[ZIO[R, E, FiniteDuration]] = { implicit val orderForIOofFiniteDuration: Order[IO[E, FiniteDuration]] = - Order.by(unsafeRun(_) match { + Order.by(unsafeRun(_)._1 match { case Exit.Success(value) => Right(value) case Exit.Failure(cause) => Left(cause.failureOption) }) @@ -193,12 +189,13 @@ private[zio] trait CatsSpecBase Cogen[Outcome[Option, E, A]].contramap { (zio: ZIO[R, E, A]) => Arbitrary.arbitrary[ZEnvironment[R]].sample match { case Some(r) => - val result = unsafeRun(zio.provideEnvironment(r)) + val (result, extInterrupted) = unsafeRun(zio.provideEnvironment(r)) result match { - case Exit.Failure(cause) if cause.isInterrupted => Outcome.canceled[Option, E, A] - case Exit.Failure(cause) => Outcome.errored(cause.failureOption.get) - case Exit.Success(value) => Outcome.succeeded(value) + case Exit.Failure(cause) => + if (cause.isInterrupted && extInterrupted) Outcome.canceled[Option, E, A] + else Outcome.errored(cause.failureOption.get) + case Exit.Success(value) => Outcome.succeeded(value) } case None => Outcome.succeeded(None) } @@ -206,8 +203,8 @@ private[zio] trait CatsSpecBase implicit def cogenOutcomeZIO[R, A](implicit cogen: Cogen[ZIO[R, Throwable, A]] - ): Cogen[Outcome[ZIO[R, Throwable, *], Throwable, A]] = - cogenOutcome[RIO[R, *], Throwable, A] + ): Cogen[Outcome[ZIO[R, Throwable, _], Throwable, A]] = + cogenOutcome[RIO[R, _], Throwable, A] } private[interop] sealed trait CatsSpecBaseLowPriority { this: CatsSpecBase => @@ -244,6 +241,23 @@ private[interop] sealed trait CatsSpecBaseLowPriority { this: CatsSpecBase => implicit def eqForTaskManaged[A: Eq](implicit ticker: Ticker): Eq[TaskManaged[A]] = zManagedEq[Any, Throwable, A] + implicit def eqForCauseOf[E: Eq]: Eq[Cause[E]] = { (exit1, exit2) => + val out1 = + toOutcomeOtherFiber0[Id, E, Either[E, Cause[Nothing]], Unit](true)(identity, Exit.Failure(exit1))( + (e, _) => Left(e), + Right(_) + ) + val out2 = + toOutcomeOtherFiber0[Id, E, Either[E, Cause[Nothing]], Unit](true)(identity, Exit.Failure(exit2))( + (e, _) => Left(e), + Right(_) + ) + (out1 eqv out2) || { + println(s"cause $out1 was not equal to cause $out2") + false + } + } + implicit def arbitraryZEnvironment[R: Arbitrary: Tag]: Arbitrary[ZEnvironment[R]] = Arbitrary(Arbitrary.arbitrary[R].map(ZEnvironment(_))) } diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala index d7e8a9e5..dd59a97d 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala @@ -6,15 +6,17 @@ import zio.* trait GenIOInteropCats { - // FIXME generating anything but success (even genFail) - // surfaces multiple further unaddressed law failures + // FIXME `genDie` and `genInternalInterrupt` surface multiple further unaddressed law failures + // See `genDie` scaladoc def betterGenerators: Boolean = false - // FIXME cats conversion surfaces failures in the following laws: - // `async left is uncancelable sequenced raiseError` - // `async right is uncancelable sequenced pure` - // `applicativeError onError raise` - // `canceled sequences onCanceled in order` + // FIXME cats conversion generator works most of the time + // but generates rare law failures in + // - `canceled sequences onCanceled in order` + // - `uncancelable eliminates onCancel` + // - `fiber join is guarantee case` + // possibly coming from the `GenSpawnGenerators#genRacePair` generator + `F.canceled`. + // Errors occur more often when combined with `genOfRace` or `genOfParallel` def catsConversionGenerator: Boolean = false /** @@ -35,9 +37,28 @@ trait GenIOInteropCats { def genFail[E: Arbitrary, A]: Gen[IO[E, A]] = Arbitrary.arbitrary[E].map(ZIO.fail[E](_)) + /** + * We can't pass laws like `cats.effect.laws.GenSpawnLaws#fiberJoinIsGuaranteeCase` + * with either `genDie` or `genInternalInterrupt` because + * we are forced to rethrow an `Outcome.Errored` using + * `raiseError` in `Outcome#embed` which converts the + * specific state into a typed error. + * + * While we consider both states to be `Outcome.Errored`, + * they aren't really 'equivalent' even if we massage them + * into having the same `Outcome`, because `handleErrorWith` + * can't recover from these states. + * + * Now, we could make ZIO Throwable instances recover from + * all errors via [[zio.Cause#squashTraceWith]], but + * this would make Throwable instances contradict the + * generic MonadError instance. + * (Which I believe is acceptable, if confusing, as long + * as the generic instances are moved to a separate `generic` + * object.) + */ def genDie(implicit arbThrowable: Arbitrary[Throwable]): Gen[UIO[Nothing]] = arbThrowable.arbitrary.map(ZIO.die(_)) - - def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt + def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = Arbitrary.arbitrary[A].map(F.canceled.as(_)) @@ -59,10 +80,12 @@ trait GenIOInteropCats { else Gen.oneOf( genSuccess[E, A], + genFail[E, A], + genCancel[E, A], genNever ) - def genUIO[A: Arbitrary]: Gen[UIO[A]] = + def genUIO[A: Arbitrary](implicit F: GenConcurrent[UIO, ?]): Gen[UIO[A]] = Gen.oneOf(genSuccess[Nothing, A], genIdentityTrans(genSuccess[Nothing, A])) /** @@ -70,7 +93,9 @@ trait GenIOInteropCats { * by using some random combination of the methods `map`, `flatMap`, `mapError`, and any other method that does not change * the success/failure of the value, but may change the value itself. */ - def genLikeTrans[E: Arbitrary: Cogen, A: Arbitrary: Cogen](gen: Gen[IO[E, A]]): Gen[IO[E, A]] = { + def genLikeTrans[E: Arbitrary: Cogen, A: Arbitrary: Cogen]( + gen: Gen[IO[E, A]] + )(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = { val functions: IO[E, A] => Gen[IO[E, A]] = io => Gen.oneOf( genOfFlatMaps[E, A](io)(genSuccess[E, A]), @@ -86,7 +111,8 @@ trait GenIOInteropCats { * Given a generator for `IO[E, A]`, produces a sized generator for `IO[E, A]` which represents a transformation, * by using methods that can have no effect on the resulting value (e.g. `map(identity)`, `io.race(never)`, `io.par(io2).map(_._1)`). */ - def genIdentityTrans[E, A: Arbitrary](gen: Gen[IO[E, A]]): Gen[IO[E, A]] = { + def genIdentityTrans[E, A: Arbitrary](gen: Gen[IO[E, A]])(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = { + implicitly[Arbitrary[A]] val functions: IO[E, A] => Gen[IO[E, A]] = io => Gen.oneOf( genOfIdentityFlatMaps[E, A](io), @@ -130,9 +156,13 @@ trait GenIOInteropCats { private def genOfIdentityFlatMaps[E, A](io: IO[E, A]): Gen[IO[E, A]] = Gen.const(io.flatMap(a => ZIO.succeed(a))) - private def genOfRace[E, A](io: IO[E, A]): Gen[IO[E, A]] = - Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible)) + private def genOfRace[E, A](io: IO[E, A])(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = +// Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible)) + Gen.const(F.race(io, ZIO.never).map(_.merge)) // we must use cats version for Outcome preservation in F.canceled - private def genOfParallel[E, A](io: IO[E, A])(gen: Gen[IO[E, A]]): Gen[IO[E, A]] = - gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1)) + private def genOfParallel[E, A](io: IO[E, A])( + gen: Gen[IO[E, A]] + )(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = +// gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1)) + gen.map(parIO => F.both(io, parIO).map(_._1)) // we must use cats version for Outcome preservation in F.canceled } diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala index 43061359..7acafaa0 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/ZioSpecBase.scala @@ -1,5 +1,6 @@ package zio.interop +import cats.effect.kernel.Outcome import org.scalacheck.{ Arbitrary, Cogen, Gen } import zio.* import zio.internal.stacktracer.Tracer @@ -7,8 +8,10 @@ import zio.managed.* private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPriority with GenIOInteropCats { - implicit def arbitraryUIO[A: Arbitrary]: Arbitrary[UIO[A]] = + implicit def arbitraryUIO[A: Arbitrary]: Arbitrary[UIO[A]] = { + import zio.interop.catz.generic.concurrentInstanceCause Arbitrary(genUIO[A]) + } implicit def arbitraryURIO[R: Cogen: Tag, A: Arbitrary]: Arbitrary[URIO[R, A]] = Arbitrary(Arbitrary.arbitrary[ZEnvironment[R] => UIO[A]].map(ZIO.environment[R].flatMap)) @@ -38,8 +41,13 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior Arbitrary(self) } - implicit def cogenCause[E]: Cogen[Cause[E]] = - Cogen(_.hashCode.toLong) + implicit def cogenCause[E: Cogen]: Cogen[Cause[E]] = + Cogen[Outcome[Option, Either[E, Int], Unit]].contramap { cause => + toOutcomeOtherFiber0[Option, E, Either[E, Int], Unit](true)(Option(_), Exit.Failure(cause))( + (e, _) => Left(e), + c => Right(c.hashCode()) + ) + } } private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase => diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala index 3e3ecfe5..93a8c51c 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala @@ -333,11 +333,9 @@ private abstract class ZioConcurrent[R, E, E1] ) } yield res - override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] = { - implicit def trace: Trace = CoreTracer.newTrace - - fa.interruptible zipPar fb.interruptible - } + // delegate race & both to default implementations, because `raceFirst` & `zipPar` semantics do not match them + override final def race[A, B](fa: F[A], fb: F[B]): F[Either[A, B]] = super.race(fa, fb) + override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] = super.both(fa, fb) override final def guarantee[A](fa: F[A], fin: F[Unit]): F[A] = { implicit def trace: Trace = CoreTracer.newTrace @@ -713,8 +711,10 @@ private abstract class ZioMonadErrorExit[R, E, E1] extends ZioMonadError[R, E, E private trait ZioMonadErrorExitThrowable[R] extends ZioMonadErrorExit[R, Throwable, Throwable] with ZioMonadErrorE[R, Throwable] { + override final protected def toOutcomeThisFiber[A](exit: Exit[Throwable, A]): UIO[Outcome[F, Throwable, A]] = toOutcomeThrowableThisFiber(exit) + protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])( exit: Exit[Throwable, A] ): UIO[Outcome[F, Throwable, A]] = @@ -722,8 +722,10 @@ private trait ZioMonadErrorExitThrowable[R] } private trait ZioMonadErrorExitCause[R, E] extends ZioMonadErrorExit[R, E, Cause[E]] with ZioMonadErrorCause[R, E] { + override protected def toOutcomeThisFiber[A](exit: Exit[E, A]): UIO[Outcome[F, Cause[E], A]] = toOutcomeCauseThisFiber(exit) + protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])( exit: Exit[E, A] ): UIO[Outcome[F, Cause[E], A]] = diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 03d3bad5..2b08efce 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -68,107 +68,102 @@ package object interop { @inline private[interop] def toOutcomeCauseOtherFiber[F[_], E, A]( actuallyInterrupted: Boolean )(pure: A => F[A], exit: Exit[E, A]): Outcome[F, Cause[E], A] = - exit match { - case Exit.Success(value) => - Outcome.Succeeded(pure(value)) - case Exit.Failure(cause) if cause.isInterrupted && actuallyInterrupted => - Outcome.Canceled() - case Exit.Failure(cause) => - Outcome.Errored(cause) - } + toOutcomeOtherFiber0(actuallyInterrupted)(pure, exit)((_, c) => c, identity) @inline private[interop] def toOutcomeThrowableOtherFiber[F[_], A]( actuallyInterrupted: Boolean )(pure: A => F[A], exit: Exit[Throwable, A]): Outcome[F, Throwable, A] = + toOutcomeOtherFiber0(actuallyInterrupted)(pure, exit)((e, _) => e, dieCauseToThrowable) + + @inline private[interop] def toOutcomeOtherFiber0[F[_], E, E1, A]( + actuallyInterrupted: Boolean + )(pure: A => F[A], exit: Exit[E, A])( + convertFail: (E, Cause[E]) => E1, + convertDie: Cause[Nothing] => E1 + ): Outcome[F, E1, A] = exit match { - case Exit.Success(value) => + case Exit.Success(value) => Outcome.Succeeded(pure(value)) - case Exit.Failure(cause) if cause.isInterrupted && actuallyInterrupted => - Outcome.Canceled() - case Exit.Failure(cause) => + case Exit.Failure(cause) => cause.failureOrCause match { - case Left(error) => - Outcome.Errored(error) - case Right(cause) => - val compositeError = dieCauseToThrowable(cause) - Outcome.Errored(compositeError) + // if we have a typed failure then we're guaranteed to not be interrupting, + // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Left(error) => + Outcome.Errored(convertFail(error, cause)) + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Right(cause) if (cause.isInterrupted || cause.isEmpty) && actuallyInterrupted => + Outcome.Canceled() + case Right(cause) => + Outcome.Errored(convertDie(cause)) } } @inline private[interop] def toOutcomeCauseThisFiber[R, E, A]( exit: Exit[E, A] ): UIO[Outcome[ZIO[R, E, _], Cause[E], A]] = - exit match { - case Exit.Success(value) => - ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) - case Exit.Failure(cause) => - if (cause.isInterrupted) - ZIO.descriptorWith { descriptor => - ZIO.succeedNow( - if (descriptor.interrupters.nonEmpty) - Outcome.Canceled() - else - Outcome.Errored(cause) - ) - } - else ZIO.succeedNow(Outcome.Errored(cause)) - } + toOutcomeThisFiber0(exit)((_, c) => c, identity) - private[interop] def toOutcomeThrowableThisFiber[R, A]( + @inline private[interop] def toOutcomeThrowableThisFiber[R, A]( exit: Exit[Throwable, A] ): UIO[Outcome[ZIO[R, Throwable, _], Throwable, A]] = - exit match { - case Exit.Success(value) => - ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) - case Exit.Failure(cause) => - def outcomeErrored: Outcome[ZIO[R, Throwable, _], Throwable, A] = - cause.failureOrCause match { - case Left(error) => - Outcome.Errored(error) - case Right(cause) => - val compositeError = dieCauseToThrowable(cause) - Outcome.Errored(compositeError) - } - - if (cause.isInterrupted) + toOutcomeThisFiber0(exit)((e, _) => e, dieCauseToThrowable) + + @inline private def toOutcomeThisFiber0[R, E, E1, A](exit: Exit[E, A])( + convertFail: (E, Cause[E]) => E1, + convertDie: Cause[Nothing] => E1 + ): UIO[Outcome[ZIO[R, E, _], E1, A]] = exit match { + case Exit.Success(value) => + ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) + case Exit.Failure(cause) => + cause.failureOrCause match { + // if we have a typed failure then we're guaranteed to not be interrupting, + // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Left(error) => + ZIO.succeedNow(Outcome.Errored(convertFail(error, cause))) + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Right(cause) if cause.isInterrupted || cause.isEmpty => ZIO.descriptorWith { descriptor => ZIO.succeedNow( if (descriptor.interrupters.nonEmpty) Outcome.Canceled() - else - outcomeErrored + else { + Outcome.Errored(convertDie(cause)) + } ) } - else ZIO.succeedNow(outcomeErrored) - } + case Right(cause) => + ZIO.succeedNow(Outcome.Errored(convertDie(cause))) + } + } private[interop] def toExitCaseThisFiber(exit: Exit[Any, Any]): UIO[Resource.ExitCase] = exit match { case Exit.Success(_) => ZIO.succeedNow(Resource.ExitCase.Succeeded) case Exit.Failure(cause) => - def exitCaseErrored: Resource.ExitCase.Errored = - cause.failureOrCause match { - case Left(error: Throwable) => - Resource.ExitCase.Errored(error) - case Left(_) => - Resource.ExitCase.Errored(FiberFailure(cause)) - case Right(cause) => - val compositeError = dieCauseToThrowable(cause) - Resource.ExitCase.Errored(compositeError) - } - - if (cause.isInterrupted) - ZIO.descriptorWith { descriptor => - ZIO.succeedNow( - if (descriptor.interrupters.nonEmpty) - Resource.ExitCase.Canceled - else - exitCaseErrored - ) - } - else - ZIO.succeedNow(exitCaseErrored) + cause.failureOrCause match { + // if we have a typed failure then we're guaranteed to not be interrupting, + // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Left(error: Throwable) => + ZIO.succeedNow(Resource.ExitCase.Errored(error)) + case Left(_) => + ZIO.succeedNow(Resource.ExitCase.Errored(FiberFailure(cause))) + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + case Right(cause) if cause.isInterrupted || cause.isEmpty => + ZIO.descriptorWith { descriptor => + ZIO.succeedNow { + if (descriptor.interrupters.nonEmpty) { + Resource.ExitCase.Canceled + } else + Resource.ExitCase.Errored(dieCauseToThrowable(cause)) + } + } + case Right(cause) => + ZIO.succeedNow(Resource.ExitCase.Errored(dieCauseToThrowable(cause))) + } } @inline private[interop] def toExit(exitCase: Resource.ExitCase): Exit[Throwable, Unit] = @@ -196,7 +191,7 @@ package object interop { ZIO.descriptorWith(d => if (d.interrupters.isEmpty) notInterrupted else ZIO.unit) } - @inline private def dieCauseToThrowable(cause: Cause[Nothing]): Throwable = + @inline private[interop] def dieCauseToThrowable(cause: Cause[Nothing]): Throwable = cause.defects match { case one :: Nil => one case _ => FiberFailure(cause) From 3683defeaf623a2afaed056a26f5ee0bf42cfa66 Mon Sep 17 00:00:00 2001 From: Kai <450507+neko-kai@users.noreply.github.com> Date: Tue, 16 Aug 2022 22:17:36 +0100 Subject: [PATCH 5/9] Fix CatsInteropSpec, redefine toOutcome for ZIO2 - ZIO2 _DOES_ preserve typed errors in the same Cause as external interruptions, so previous definition was incorrect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There remain test failures in 'canceled sequences onCancel in order' – they are occur when `genOfRace`/`genOfParallel` or `genCancel` occurs, so something might still be wrong with outcome conversion in these case OR there may be bugs in ZIO 2 (or some more tricky behavior) --- .../scala/zio/interop/CatsInteropSpec.scala | 96 ++++++++------- .../scala/zio/interop/GenIOInteropCats.scala | 2 +- .../src/main/scala/zio/interop/cats.scala | 36 +----- .../src/main/scala/zio/interop/package.scala | 112 +++++++++++------- 4 files changed, 124 insertions(+), 122 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala index 39078fc3..cf27333b 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsInteropSpec.scala @@ -129,58 +129,64 @@ object CatsInteropSpec extends CatsRunnableSpec { }, test( "onCancel is triggered when a fiber executing ZIO.parTraverse + ZIO.fail is interrupted and the inner typed" + - " error is lost in final Cause (Fail & Interrupt nodes cannot both exist in Cause after external interruption)" + " error is, unlike ZIO 1, preserved in final Cause (in ZIO 1 Fail & Interrupt nodes CAN both exist in Cause after external interruption)" ) { val F = Concurrent[Task] + def println(s: String): Unit = { + val _ = s + } + for { - latch1 <- F.deferred[Unit] - latch2 <- F.deferred[Unit] - latch3 <- F.deferred[Unit] - counter <- F.ref("") - cause <- F.ref(Option.empty[Cause[Throwable]]) -// outerScope <- ZIO.forkScope - outerScope <- ZIO.scope - fiber <- F.guaranteeCase( - F.onError( - F.onCancel( - ZIO - .collectAllPar( - List( - F.onCancel( - ZIO.never, - latch2.complete(()).unit - ), - (latch1.complete(()) *> latch3.get).uninterruptible, - counter.update(_ + "A") *> - latch1.get *> - ZIO.fail(new RuntimeException("The_Error")).unit - ) - ) -// .overrideForkScope(outerScope) - .forkScoped - .provideSomeLayer(ZLayer.succeed(outerScope)) // todo how to replace overrideForkScope? - .onExit { - case Exit.Success(_) => ZIO.unit - case Exit.Failure(c) => cause.set(Some(c)).orDie - }, - counter.update(_ + "B") - ) - ) { case _ => counter.update(_ + "1") } - ) { - case Outcome.Errored(_) => counter.update(_ + "2") - case Outcome.Canceled() => counter.update(_ + "C") - case Outcome.Succeeded(_) => counter.update(_ + "3") - }.fork - _ <- latch2.get - _ <- fiber.interrupt - _ <- latch3.complete(()) - res <- counter.get - cause <- cause.get + latch1 <- F.deferred[Unit] + latch2 <- F.deferred[Unit] + latch3 <- F.deferred[Unit] + counter <- F.ref("") + cause <- F.ref(Option.empty[Cause[Throwable]]) + fiberId <- ZIO.fiberId + fiber <- F.guaranteeCase( + F.onError( + F.onCancel( + ZIO + .collectAllPar( + List( + F.onCancel( + ZIO.never, + ZIO.succeed(println("A")) *> latch2.complete(()).unit + ).onExit(_ => ZIO.succeed(println("XA"))), + (latch1.complete(()) *> latch3.get *> ZIO.succeed(println("C"))).uninterruptible, + counter.update(_ + "A") *> + latch1.get *> + ZIO.succeed(println("B")) *> ZIO.fail(new RuntimeException("The_Error")).unit + ) + ) + .onExit { + case Exit.Success(_) => ZIO.unit + case Exit.Failure(c) => cause.set(Some(c)).orDie + }, + counter.update(_ + "B") + ) + ) { case _ => counter.update(_ + "1") } + ) { + case Outcome.Errored(_) => counter.update(_ + "2") + case Outcome.Canceled() => counter.update(_ + "C") + case Outcome.Succeeded(_) => counter.update(_ + "3") + }.fork + _ = println("x1") + _ <- latch2.get + _ = println("x2") + _ <- fiber.interruptFork + _ = println("x3") + _ <- latch3.complete(()) + _ <- fiber.interrupt + _ = println("x4") + res <- counter.get + cause <- cause.get } yield assertTrue(!res.contains("1")) && assertTrue(res == "ABC") && assertTrue(cause.isDefined) && - assertTrue(!cause.get.prettyPrint.contains("The_Error")) + assertTrue(cause.get.prettyPrint.contains("The_Error")) && + assertTrue(cause.get.interruptors.contains(fiberId)) }, test("F.canceled.toEffect results in CancellationException, not BoxedException") { val F = Concurrent[Task] diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala index dd59a97d..b928a61c 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala @@ -12,7 +12,7 @@ trait GenIOInteropCats { // FIXME cats conversion generator works most of the time // but generates rare law failures in - // - `canceled sequences onCanceled in order` + // - `canceled sequences onCancel in order` // - `uncancelable eliminates onCancel` // - `fiber join is guarantee case` // possibly coming from the `GenSpawnGenerators#genRacePair` generator + `F.canceled`. diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala index 93a8c51c..29f6193c 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala @@ -27,9 +27,8 @@ import zio.{ Fiber, Ref as ZRef, ZEnvironment } import zio.* import zio.Clock.{ currentTime, nanoTime } import zio.Duration - import zio.internal.stacktracer.InteropTracer -import zio.internal.stacktracer.{ Tracer => CoreTracer } +import zio.internal.stacktracer.Tracer as CoreTracer import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.{ ExecutionContext, Future } @@ -280,35 +279,10 @@ private abstract class ZioConcurrent[R, E, E1] def loopUntilInterrupted: UIO[Unit] = ZIO.descriptorWith(d => if (d.interrupters.isEmpty) ZIO.yieldNow *> loopUntilInterrupted else ZIO.unit) - val maxRetries = 10 - - def getThisFiber(retries: Int, unsafe: Unsafe): ZIO[Any, Nothing, Fiber[Any, Any]] = - ZIO.yieldNow *> // ZIO.yieldNow is necessary to avoid empty result in some cases (unsafeRunToFuture) - ZIO.suspendSucceed { - Fiber.currentFiber()(unsafe) match { - case Some(fiber) => - ZIO.succeedNow(fiber) - case None => - if (retries < maxRetries) - getThisFiber(retries + 1, unsafe) - else - ZIO.succeed( - throw new IllegalStateException( - "Impossible state: current Fiber not found in `zio.Fiber.unsafeCurrentFiber`" - ) - ) - } - } - - Unsafe.unsafeCompat { implicit unsafe => - ZIO.suspendSucceed { - for { - thisFiber <- getThisFiber(0, unsafe) - _ <- thisFiber.interruptAs(thisFiber.id).forkDaemon - _ <- loopUntilInterrupted - } yield () - } - } + for { + _ <- ZIO.withFiberRuntime[Any, Nothing, Unit]((thisFiber, _) => thisFiber.interruptAsFork(thisFiber.id)) + _ <- loopUntilInterrupted + } yield () } override final def onCancel[A](fa: F[A], fin: F[Unit]): F[A] = diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 2b08efce..228fd549 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -85,17 +85,25 @@ package object interop { case Exit.Success(value) => Outcome.Succeeded(pure(value)) case Exit.Failure(cause) => - cause.failureOrCause match { - // if we have a typed failure then we're guaranteed to not be interrupting, - // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Left(error) => - Outcome.Errored(convertFail(error, cause)) - // deem empty cause to be interruption as well, due to occasional invalid ZIO states - // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Right(cause) if (cause.isInterrupted || cause.isEmpty) && actuallyInterrupted => - Outcome.Canceled() - case Right(cause) => - Outcome.Errored(convertDie(cause)) + // ZIO 2, unlike ZIO 1, _does not_ guarantee that the presence of a typed failure + // means we're NOT interrupting, so we have to check for interruption to matter what + if ( + (cause.isInterrupted || { + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + // NOTE: this line is for ZIO 1, it may not apply for ZIO 2, someone needs to debunk + // whether this is required + cause.isEmpty + }) && actuallyInterrupted + ) { + Outcome.Canceled() + } else { + cause.failureOrCause match { + case Left(error) => + Outcome.Errored(convertFail(error, cause)) + case Right(cause) => + Outcome.Errored(convertDie(cause)) + } } } @@ -116,26 +124,33 @@ package object interop { case Exit.Success(value) => ZIO.succeedNow(Outcome.Succeeded(ZIO.succeedNow(value))) case Exit.Failure(cause) => - cause.failureOrCause match { - // if we have a typed failure then we're guaranteed to not be interrupting, - // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Left(error) => + lazy val nonCanceledOutcome: UIO[Outcome[ZIO[R, E, _], E1, A]] = cause.failureOrCause match { + case Left(error) => ZIO.succeedNow(Outcome.Errored(convertFail(error, cause))) - // deem empty cause to be interruption as well, due to occasional invalid ZIO states - // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Right(cause) if cause.isInterrupted || cause.isEmpty => - ZIO.descriptorWith { descriptor => - ZIO.succeedNow( - if (descriptor.interrupters.nonEmpty) - Outcome.Canceled() - else { - Outcome.Errored(convertDie(cause)) - } - ) - } - case Right(cause) => + case Right(cause) => ZIO.succeedNow(Outcome.Errored(convertDie(cause))) } + // ZIO 2, unlike ZIO 1, _does not_ guarantee that the presence of a typed failure + // means we're NOT interrupting, so we have to check for interruption to matter what + if ( + cause.isInterrupted || { + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + // NOTE: this line is for ZIO 1, it may not apply for ZIO 2, someone needs to debunk + // whether this is required + cause.isEmpty + } + ) { + ZIO.descriptorWith { descriptor => + if (descriptor.interrupters.nonEmpty) + ZIO.succeedNow(Outcome.Canceled()) + else { + nonCanceledOutcome + } + } + } else { + nonCanceledOutcome + } } private[interop] def toExitCaseThisFiber(exit: Exit[Any, Any]): UIO[Resource.ExitCase] = @@ -143,27 +158,34 @@ package object interop { case Exit.Success(_) => ZIO.succeedNow(Resource.ExitCase.Succeeded) case Exit.Failure(cause) => - cause.failureOrCause match { - // if we have a typed failure then we're guaranteed to not be interrupting, - // typed failure absence is guaranteed by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Left(error: Throwable) => + lazy val nonCanceledOutcome: UIO[Resource.ExitCase] = cause.failureOrCause match { + case Left(error: Throwable) => ZIO.succeedNow(Resource.ExitCase.Errored(error)) - case Left(_) => + case Left(_) => ZIO.succeedNow(Resource.ExitCase.Errored(FiberFailure(cause))) - // deem empty cause to be interruption as well, due to occasional invalid ZIO states - // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= - case Right(cause) if cause.isInterrupted || cause.isEmpty => - ZIO.descriptorWith { descriptor => - ZIO.succeedNow { - if (descriptor.interrupters.nonEmpty) { - Resource.ExitCase.Canceled - } else - Resource.ExitCase.Errored(dieCauseToThrowable(cause)) - } - } - case Right(cause) => + case Right(cause) => ZIO.succeedNow(Resource.ExitCase.Errored(dieCauseToThrowable(cause))) } + // ZIO 2, unlike ZIO 1, _does not_ guarantee that the presence of a typed failure + // means we're NOT interrupting, so we have to check for interruption to matter what + if ( + cause.isInterrupted || { + // deem empty cause to be interruption as well, due to occasional invalid ZIO states + // in `ZIO.fail().uninterruptible` caused by this line https://github.com/zio/zio/blob/22921ee5ac0d2e03531f8b37dfc0d5793a467af8/core/shared/src/main/scala/zio/internal/FiberContext.scala#L415= + // NOTE: this line is for ZIO 1, it may not apply for ZIO 2, someone needs to debunk + // whether this is required + cause.isEmpty + } + ) { + ZIO.descriptorWith { descriptor => + if (descriptor.interrupters.nonEmpty) { + ZIO.succeedNow(Resource.ExitCase.Canceled) + } else + nonCanceledOutcome + } + } else { + nonCanceledOutcome + } } @inline private[interop] def toExit(exitCase: Resource.ExitCase): Exit[Throwable, Unit] = From 6b949ed621a968573365273d0a4b4d2bd84d3a6e Mon Sep 17 00:00:00 2001 From: ilopatin Date: Thu, 18 Aug 2022 13:56:13 +0300 Subject: [PATCH 6/9] Remove todos --- .../scala/zio/interop/CatsZManagedSyntaxSpec.scala | 2 +- .../src/test/scala/zio/interop/CatsSpecBase.scala | 7 +------ .../scala/zio/test/interop/CatsRunnableSpec.scala | 13 ++----------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala index b3c62da7..ad70d8a0 100644 --- a/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala +++ b/zio-interop-cats-tests/jvm/src/test/scala/zio/interop/CatsZManagedSyntaxSpec.scala @@ -334,7 +334,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec { effects <- ZIO.succeed(effects.toList) } yield assert(effects)(equalTo(List(1, 2))) }, - test("calls finalizers when using resource is externally interrupted") { // todo + test("calls finalizers when using resource is externally interrupted") { val effects = new mutable.ListBuffer[Int] def man(x: Int): ZManaged[Any, Throwable, Unit] = ZManaged.acquireReleaseExitWith(ZIO.succeed(effects += x).unit) { diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index 64601c9e..d87dafa9 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -97,12 +97,7 @@ private[zio] trait CatsSpecBase FiberRef.currentBlockingExecutor -> ::(fiberId -> blockingExecutor, Nil) ) ) - val runtimeFlags = RuntimeFlags( // todo - RuntimeFlag.FiberRoots, - RuntimeFlag.Interruption, - RuntimeFlag.CooperativeYielding, - RuntimeFlag.CurrentFiber - ) + val runtimeFlags = RuntimeFlags.default Runtime(ZEnvironment.empty, fiberRefs, runtimeFlags) } diff --git a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala index c2de9edf..1cf1de25 100644 --- a/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala +++ b/zio-test-interop-cats/shared/src/test/scala/zio/test/interop/CatsRunnableSpec.scala @@ -12,17 +12,8 @@ abstract class CatsRunnableSpec extends ZIOSpecDefault { private[this] var openDispatcher: Dispatcher[CIO] = _ private[this] var closeDispatcher: CIO[Unit] = _ - implicit def zioRuntime: Runtime[Any] = // todo - Runtime( - ZEnvironment.empty, - FiberRefs.empty, - RuntimeFlags( - RuntimeFlag.FiberRoots, - RuntimeFlag.Interruption, - RuntimeFlag.CooperativeYielding, - RuntimeFlag.CurrentFiber - ) - ) + implicit val zioRuntime: Runtime[Any] = + Runtime.default implicit val cioRuntime: IORuntime = Scheduler.createDefaultScheduler() match { From 5b18ab686b14f406b9d563820db62a7c30bc27e6 Mon Sep 17 00:00:00 2001 From: ilopatin Date: Wed, 24 Aug 2022 17:34:30 +0300 Subject: [PATCH 7/9] Update to ZIO 2.0.1 --- .../shared/src/main/scala/zio/interop/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala index 228fd549..710d6305 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala @@ -35,7 +35,7 @@ package object interop { F.defer { val interrupted = new AtomicBoolean(true) F.async[Exit[Throwable, A]] { cb => - Unsafe.unsafeCompat { implicit unsafe => + Unsafe.unsafe { implicit unsafe => val fiber = R.unsafe.fork { signalOnNoExternalInterrupt { rio From 4acfe79baec95e4fddd2430c9f6d3e6e9b276afa Mon Sep 17 00:00:00 2001 From: ilopatin Date: Sun, 18 Sep 2022 21:21:20 +0300 Subject: [PATCH 8/9] Remove genNever --- .../src/test/scala/zio/interop/GenIOInteropCats.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala index b928a61c..921fb24e 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala @@ -63,8 +63,6 @@ trait GenIOInteropCats { def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = Arbitrary.arbitrary[A].map(F.canceled.as(_)) - def genNever: Gen[UIO[Nothing]] = ZIO.never - def genIO[E: Arbitrary, A: Arbitrary](implicit arbThrowable: Arbitrary[Throwable], F: GenConcurrent[IO[E, _], ?] @@ -74,15 +72,13 @@ trait GenIOInteropCats { genFail[E, A], genDie, genInternalInterrupt, - genCancel[E, A], - genNever + genCancel[E, A] ) else Gen.oneOf( genSuccess[E, A], genFail[E, A], - genCancel[E, A], - genNever + genCancel[E, A] ) def genUIO[A: Arbitrary](implicit F: GenConcurrent[UIO, ?]): Gen[UIO[A]] = From f13c6c03d93ea523ae9b0c20750ae110c237e9a6 Mon Sep 17 00:00:00 2001 From: ilopatin Date: Fri, 23 Sep 2022 20:05:27 +0300 Subject: [PATCH 9/9] Update eqForUIO --- .../src/test/scala/zio/interop/CatsSpecBase.scala | 10 +++++++--- .../src/test/scala/zio/interop/GenIOInteropCats.scala | 4 ++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala index 86aa77e1..57b0cc61 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/CatsSpecBase.scala @@ -3,6 +3,7 @@ package zio.interop import cats.effect.testkit.TestInstances import cats.effect.kernel.Outcome import cats.effect.IO as CIO +import cats.effect.kernel.Outcome.Succeeded import cats.syntax.all.* import cats.{ Eq, Id, Order } import org.scalacheck.{ Arbitrary, Cogen, Gen, Prop } @@ -129,9 +130,12 @@ private[zio] trait CatsSpecBase val (exit2, i2) = unsafeRun(uio2) val out1 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i1)(identity, exit1) val out2 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i2)(identity, exit2) - (out1 eqv out2) || { - println(s"$out1 was not equal to $out2") - false + (out1, out2) match { + case (Succeeded(Some(a)), Succeeded(Some(b))) => a eqv b + case (Succeeded(Some(_)), _) | (_, Succeeded(Some(_))) => + println(s"$out1 was not equal to $out2") + false + case _ => true } } diff --git a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala index 921fb24e..5c6534e2 100644 --- a/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala +++ b/zio-interop-cats-tests/shared/src/test/scala/zio/interop/GenIOInteropCats.scala @@ -63,6 +63,8 @@ trait GenIOInteropCats { def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = Arbitrary.arbitrary[A].map(F.canceled.as(_)) + def genNever: Gen[UIO[Nothing]] = ZIO.never + def genIO[E: Arbitrary, A: Arbitrary](implicit arbThrowable: Arbitrary[Throwable], F: GenConcurrent[IO[E, _], ?] @@ -72,12 +74,14 @@ trait GenIOInteropCats { genFail[E, A], genDie, genInternalInterrupt, + genNever, genCancel[E, A] ) else Gen.oneOf( genSuccess[E, A], genFail[E, A], + genNever, genCancel[E, A] )