From ff945ac6e327dea39f24d4dca7eb5c73f41b0c89 Mon Sep 17 00:00:00 2001 From: Dan Di Spaltro Date: Fri, 7 Jun 2024 10:52:24 -0700 Subject: [PATCH] Attempt to handle the Dispatcher case --- build.sbt | 3 +- .../cats3/CleanSchedulerContextAdvice35.java | 35 +++ .../src/main/resources/reference.conf | 4 +- .../effect/kamonCats/PackageAccessor.scala | 41 +++ .../cats3/IOFiberInstrumentation.scala | 88 ++++-- .../cats3/CatsIoInstrumentationSpec.scala | 253 +++++++++++++++++- 6 files changed, 397 insertions(+), 27 deletions(-) create mode 100644 instrumentation/kamon-cats-io-3/src/main/java/kamon/instrumentation/cats3/CleanSchedulerContextAdvice35.java create mode 100644 instrumentation/kamon-cats-io-3/src/main/scala/cats/effect/kamonCats/PackageAccessor.scala diff --git a/build.sbt b/build.sbt index 0af2d9bf9..2b3f5099a 100644 --- a/build.sbt +++ b/build.sbt @@ -267,7 +267,8 @@ lazy val `kamon-cats-io-3` = (project in file("instrumentation/kamon-cats-io-3") crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version), libraryDependencies ++= Seq( kanelaAgent % "provided", - "org.typelevel" %% "cats-effect" % "3.3.14" % "provided", +// "org.typelevel" %% "cats-effect" % "3.3.14" % "provided", + "org.typelevel" %% "cats-effect" % "3.5.4" % "provided", scalatest % "test", logbackClassic % "test" ), diff --git a/instrumentation/kamon-cats-io-3/src/main/java/kamon/instrumentation/cats3/CleanSchedulerContextAdvice35.java b/instrumentation/kamon-cats-io-3/src/main/java/kamon/instrumentation/cats3/CleanSchedulerContextAdvice35.java new file mode 100644 index 000000000..01e322c72 --- /dev/null +++ b/instrumentation/kamon-cats-io-3/src/main/java/kamon/instrumentation/cats3/CleanSchedulerContextAdvice35.java @@ -0,0 +1,35 @@ +package kamon.instrumentation.cats3; + +import kamon.Kamon; +import kamon.context.Context; +import kamon.context.Storage; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import scala.Function1; +import scala.runtime.BoxedUnit; +import scala.runtime.Nothing$; +import scala.util.Right; + +public class CleanSchedulerContextAdvice35 { + @Advice.OnMethodEnter + public static void enter(@Advice.Argument(value = 1, readOnly = false) Function1, BoxedUnit> callback) { + callback = new CleanSchedulerContextAdvice35.ContextCleaningWrapper(callback, Kamon.currentContext()); + } + + + public static class ContextCleaningWrapper implements Function1, BoxedUnit> { + private final Function1, BoxedUnit> runnable; + private final Context context; + + public ContextCleaningWrapper(Function1, BoxedUnit> runnable, Context context) { + this.runnable = runnable; + this.context = context; + } + + @Override + public BoxedUnit apply(Right v1) { + try (Storage.Scope ignored = Kamon.storeContext(context)) { + return runnable.apply(v1); + } + } + } +} diff --git a/instrumentation/kamon-cats-io-3/src/main/resources/reference.conf b/instrumentation/kamon-cats-io-3/src/main/resources/reference.conf index 67b4a87e4..ada40cc0f 100644 --- a/instrumentation/kamon-cats-io-3/src/main/resources/reference.conf +++ b/instrumentation/kamon-cats-io-3/src/main/resources/reference.conf @@ -12,7 +12,9 @@ kanela.modules { within = [ "cats\\.effect\\.IOFiber", + "cats\\.effect\\.std\\.Dispatcher", + "cats\\.effect\\.unsafe\\.WorkStealingThreadPool", "cats\\.effect\\.unsafe\\.SchedulerCompanionPlatform.*" ] } -} \ No newline at end of file +} diff --git a/instrumentation/kamon-cats-io-3/src/main/scala/cats/effect/kamonCats/PackageAccessor.scala b/instrumentation/kamon-cats-io-3/src/main/scala/cats/effect/kamonCats/PackageAccessor.scala new file mode 100644 index 000000000..b1ae9fc0d --- /dev/null +++ b/instrumentation/kamon-cats-io-3/src/main/scala/cats/effect/kamonCats/PackageAccessor.scala @@ -0,0 +1,41 @@ +package cats.effect.kamonCats + +import org.slf4j.LoggerFactory + +/** + * Utility class to make accessing some internals from Kamon, more accessible. + * + */ +object PackageAccessor { + + private val LOG = LoggerFactory.getLogger(getClass) + + /** + * This uses reflection to get the objectState, which acts like a stack + * of effects so we can determine something about the lineage of a particular fiber. + * + * @param fiber A runnable or IOFiber + * @return + */ + def fiberObjectStackBuffer(fiber: Any): Array[AnyRef] = { + try { + val field = fiber.getClass.getDeclaredField("objectState") + field.setAccessible(true) + field.get(fiber).asInstanceOf[cats.effect.ArrayStack[AnyRef]].unsafeBuffer() + } catch { + case _: Exception => + if (LOG.isWarnEnabled) + LOG.warn("Unable to get the object stack buffer.") + Array.empty + } + + } + + /** This frankly kinda isn't great, but I couldn't figure out how to do this */ + def isDispatcherWorker(obj: AnyRef): Boolean = { + if (obj != null) + obj.getClass.getName.contains("Dispatcher$Worker") + else false + } + +} diff --git a/instrumentation/kamon-cats-io-3/src/main/scala/kamon/instrumentation/cats3/IOFiberInstrumentation.scala b/instrumentation/kamon-cats-io-3/src/main/scala/kamon/instrumentation/cats3/IOFiberInstrumentation.scala index b97156216..e42021ee2 100644 --- a/instrumentation/kamon-cats-io-3/src/main/scala/kamon/instrumentation/cats3/IOFiberInstrumentation.scala +++ b/instrumentation/kamon-cats-io-3/src/main/scala/kamon/instrumentation/cats3/IOFiberInstrumentation.scala @@ -1,7 +1,7 @@ package kamon.instrumentation.cats3 - +import cats.effect.kamonCats.PackageAccessor import kamon.Kamon -import kamon.context.Storage.Scope +import kamon.context.Storage import kamon.instrumentation.context.HasContext import kanela.agent.api.instrumentation.InstrumentationBuilder import kanela.agent.libs.net.bytebuddy.asm.Advice @@ -12,8 +12,9 @@ class IOFiberInstrumentation extends InstrumentationBuilder { onType("cats.effect.IOFiber") .mixin(classOf[HasContext.Mixin]) + .mixin(classOf[HasStorage.Mixin]) .advise(isConstructor.and(takesArguments(5).or(takesArguments(3))), AfterFiberInit) - .advise(method("suspend"), SaveCurrentContextOnExit) + .advise(method("suspend"), SaveCurrentContextOnSuspend) .advise(method("resume"), RestoreContextOnSuccessfulResume) .advise(method("run"), RunLoopWithContext) @@ -31,16 +32,18 @@ class IOFiberInstrumentation extends InstrumentationBuilder { ) onTypes("cats.effect.unsafe.WorkStealingThreadPool") + .advise(method("sleepInternal"), classOf[CleanSchedulerContextAdvice35]) // > 3.3 .advise( anyMethods( - "scheduleFiber", - "rescheduleFiber", + "scheduleFiber", // <3.4 + "rescheduleFiber", // <3.4 + "reschedule", "scheduleExternal" ), SetContextOnNewFiberForWSTP ) - // Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread, + // For < 3.4 cats, Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread, // which always leaves a dirty thread. This wrapper ensures that scheduled actions are // executed with the same Context that was available when they were scheduled, and then // reset the scheduler thread to the empty context. @@ -48,6 +51,38 @@ class IOFiberInstrumentation extends InstrumentationBuilder { .advise(method("sleep"), classOf[CleanSchedulerContextAdvice]) } +/** + * Mixin that exposes access to the scope captured by an instrumented instance. + * The interface exposes means of getting and more importantly closing of the + * scope. + */ +trait HasStorage { + + /** + * Returns the [[Storage.Scope]] stored in the instrumented instance. + */ + def kamonScope: Storage.Scope + + /** + * Updates the [[Storage.Scope]] stored in the instrumented instance + */ + def setKamonScope(scope: Storage.Scope): Unit + +} + +object HasStorage { + + /** + * [[HasStorage]] implementation that keeps the scope in a mutable field. + */ + class Mixin(@transient private var _scope: Storage.Scope) extends HasStorage { + + override def kamonScope: Storage.Scope = if (_scope != null) _scope else Storage.Scope.Empty + + override def setKamonScope(scope: Storage.Scope): Unit = _scope = scope + } +} + class AfterFiberInit object AfterFiberInit { @@ -61,13 +96,13 @@ class RunLoopWithContext object RunLoopWithContext { @Advice.OnMethodEnter() - @static def enter(@Advice.This fiber: Any): Scope = { + @static def enter(@Advice.This fiber: Any): Storage.Scope = { val ctxFiber = fiber.asInstanceOf[HasContext].context Kamon.storeContext(ctxFiber) } @Advice.OnMethodExit() - @static def exit(@Advice.Enter scope: Scope, @Advice.This fiber: Any): Unit = { + @static def exit(@Advice.Enter scope: Storage.Scope, @Advice.This fiber: Any): Unit = { val leftContext = Kamon.currentContext() scope.close() @@ -80,19 +115,41 @@ object RestoreContextOnSuccessfulResume { @Advice.OnMethodExit() @static def exit(@Advice.This fiber: Any, @Advice.Return wasSuspended: Boolean): Unit = { - if (wasSuspended) { - val ctxFiber = fiber.asInstanceOf[HasContext].context - Kamon.storeContext(ctxFiber) + + // Resume is tricky, most of the time we want to keep the `wasSuspended` behavior, + // but there's a single issue with Dispatcher, basically it resumes differently, so + // we try to catch that case and identify it and do something different with it. + val dispatcherIsRunningDirectly = + PackageAccessor.fiberObjectStackBuffer(fiber).exists(PackageAccessor.isDispatcherWorker) + + val fi = fiber.asInstanceOf[HasContext with HasStorage] + + val setContext = wasSuspended && !dispatcherIsRunningDirectly + if (setContext) { + fi.setKamonScope(Kamon.storeContext(fi.context)) + } else if (dispatcherIsRunningDirectly) { + fi.setContext(Kamon.currentContext()) } } } -class SaveCurrentContextOnExit -object SaveCurrentContextOnExit { +class SaveCurrentContextOnSuspend +object SaveCurrentContextOnSuspend { @Advice.OnMethodExit() @static def exit(@Advice.This fiber: Any): Unit = { - fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) + val fi = fiber.asInstanceOf[HasContext with HasStorage] + fi.setContext(Kamon.currentContext()) + } +} + +class CleanFiberUp +object CleanFiberUp { + + @Advice.OnMethodExit() + @static def exit(@Advice.This fiber: Any): Unit = { + val fi = fiber.asInstanceOf[HasContext with HasStorage] + fi.kamonScope.close() } } @@ -108,7 +165,6 @@ class SetContextOnNewFiberForWSTP object SetContextOnNewFiberForWSTP { @Advice.OnMethodEnter() - @static def enter(@Advice.Argument(0) fiber: Any): Unit = { + @static def enter(@Advice.Argument(0) fiber: Any): Unit = fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext()) - } } diff --git a/instrumentation/kamon-cats-io-3/src/test/scala/kamon/instrumentation/futures/cats3/CatsIoInstrumentationSpec.scala b/instrumentation/kamon-cats-io-3/src/test/scala/kamon/instrumentation/futures/cats3/CatsIoInstrumentationSpec.scala index 85a8bfcd6..a9c22c213 100644 --- a/instrumentation/kamon-cats-io-3/src/test/scala/kamon/instrumentation/futures/cats3/CatsIoInstrumentationSpec.scala +++ b/instrumentation/kamon-cats-io-3/src/test/scala/kamon/instrumentation/futures/cats3/CatsIoInstrumentationSpec.scala @@ -1,7 +1,9 @@ package kamon.instrumentation.futures.cats3 +import cats.Parallel +import cats.effect.std.Dispatcher import cats.effect.unsafe.{IORuntime, IORuntimeConfig, Scheduler} -import cats.effect.{IO, Resource, Spawn} +import cats.effect.{Async, IO, Resource, Spawn, Sync} import kamon.Kamon import kamon.context.Context import kamon.tag.Lookups.plain @@ -18,6 +20,8 @@ import cats.implicits._ import kamon.trace.Identifier.Scheme import kamon.trace.{Identifier, Span, Trace} +import scala.util.{Failure, Success} + class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutures with PatienceConfiguration with OptionValues with Eventually with BeforeAndAfterEach { @@ -58,6 +62,126 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu context shouldBe contextInsideYield } + "must handle async boundaries" in { + val runtime = IORuntime.global + val anotherExecutionContext: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) + val context = Context.of("key", "value") + + def asyncFunction[A](dur: FiniteDuration)(io: IO[A]) = IO.async[A] { cb => + val cancel = runtime.scheduler.sleep( + dur, + { () => + cb(Right(io.unsafeRunSync()(runtime))) + } + ) + IO(Some(IO(cancel.run()))) + } + + val test = + for { + _ <- IO.delay(Kamon.storeContext(context)) + _ <- IO.sleep(10.millis) + beforeCleaning <- IO.delay(Kamon.currentContext()) + beforeCleaningAsync <- asyncFunction(10.millis) { + IO(Kamon.currentContext()) + } + _ <- IO.delay(Kamon.storeContext(Context.Empty)) + + _ <- Spawn[IO].evalOn(IO.sleep(10.millis), anotherExecutionContext) + afterCleaning <- IO.delay(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + beforeCleaning shouldBe context + beforeCleaningAsync shouldBe context + } + + test.unsafeRunSync()(runtime) + } + + "must handle async boundaries - F" in { + val runtime = IORuntime.global + def test[F[_]: Async](dispatcher: Dispatcher[F]) = { + val runtime = IORuntime.global + val anotherExecutionContext: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) + val context = Context.of("key", "value") + + def asyncFunction[A](dur: FiniteDuration)(io: F[A]) = Async[F].async[A] { cb => + val cancel = runtime.scheduler.sleep( + dur, + { () => + dispatcher.unsafeToFuture(io).onComplete { + case Failure(exception) => cb(Left(exception)) + case Success(value) => cb(Right(value)) + }(anotherExecutionContext) + } + ) + Async[F].delay(Some(Async[F].delay(cancel.run()))) + } + + for { + _ <- Async[F].delay(Kamon.storeContext(context)) + _ <- Async[F].sleep(10.millis) + beforeCleaning <- Async[F].delay(Kamon.currentContext()) + beforeCleaningAsync <- asyncFunction(10.millis) { + Async[F].delay(Kamon.currentContext()) + } + _ <- Async[F].delay(Kamon.storeContext(Context.Empty)) + + _ <- Spawn[F].evalOn(Async[F].sleep(10.millis), anotherExecutionContext) + afterCleaning <- Async[F].delay(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + beforeCleaning shouldBe context + beforeCleaningAsync shouldBe context + } + + } + + Dispatcher.parallel[IO](true).use { dispatcher => + test[IO](dispatcher) + }.unsafeRunSync()(runtime) + } + + "must correctly use dispatchers" in { + val runtime = IORuntime.global + def test[F[_]: Async](dispatcher: Dispatcher[F]) = { + val anotherExecutionContext: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) + val context = Context.of("key", "value") + + def asyncFunction[A](io: F[A]) = Async[F].async[A] { cb => + dispatcher.unsafeToFuture(io).onComplete { + case Failure(exception) => cb(Left(exception)) + case Success(value) => cb(Right(value)) + }(anotherExecutionContext) + Async[F].delay(None) + } + + for { + _ <- Async[F].delay(Kamon.storeContext(context)) + beforeCleaningAsync <- asyncFunction { + Async[F].delay(Kamon.currentContext()) + } + _ <- Async[F].delay(Kamon.storeContext(Context.Empty)) + } yield { + beforeCleaningAsync shouldBe context + } + } + + Vector( + Dispatcher.sequential[IO](false), + Dispatcher.sequential[IO](true), + Dispatcher.parallel[IO](false), + Dispatcher.parallel[IO](true) + ).traverse(_.use { dispatcher => + withClue(s"Dispatcher $dispatcher") { + test[IO](dispatcher) + } + }).unsafeRunSync()(runtime) + } + "must allow the context to be cleaned" in { val runtime = IORuntime.global val anotherExecutionContext: ExecutionContext = @@ -80,6 +204,28 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu test.unsafeRunSync()(runtime) } + "must allow the context to be cleaned - F" in { + val runtime = IORuntime.global + val anotherExecutionContext: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(10)) + val context = Context.of("key", "value") + + def test[F[_]: Async] = + for { + _ <- Async[F].delay(Kamon.storeContext(context)) + _ <- Spawn[F].evalOn(Async[F].sleep(10.millis), anotherExecutionContext) + beforeCleaning <- Async[F].delay(Kamon.currentContext()) + _ <- Async[F].delay(Kamon.storeContext(Context.Empty)) + _ <- Spawn[F].evalOn(Async[F].sleep(10.millis), anotherExecutionContext) + afterCleaning <- Async[F].delay(Kamon.currentContext()) + } yield { + afterCleaning shouldBe Context.Empty + beforeCleaning shouldBe context + } + + test[IO].unsafeRunSync()(runtime) + } + "must be available across asynchronous boundaries" in { val runtime = IORuntime.apply( ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), // pool 4 @@ -119,6 +265,46 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu test.unsafeRunSync()(runtime) } + "must be available across asynchronous boundaries - F" in { + val runtime = IORuntime.apply( + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), // pool 4 + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)), // pool 5 + Scheduler.fromScheduledExecutor(Executors.newSingleThreadScheduledExecutor()), // pool 6 + () => (), + IORuntimeConfig.apply() + ) + val anotherExecutionContext: ExecutionContext = + ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) // pool 7 + val context = Context.of("key", "value") + def test[F[_]: Async] = + for { + scope <- Async[F].delay(Kamon.storeContext(context)) + len <- Async[F].delay("Hello Kamon!").map(_.length) + _ <- Async[F].delay(len.toString) + beforeChanging <- getKeyF[F]() + evalOnGlobalRes <- Spawn[F].evalOn(Async[F].sleep(Duration.Zero).flatMap(_ => getKeyF[F]()), global) + outerSpanIdBeginning <- Async[F].delay(Kamon.currentSpan().id.string) + innerSpan <- Async[F].delay(Kamon.clientSpanBuilder("Foo", "attempt").context(context).start()) + innerSpanId1 <- Spawn[F].evalOn(Async[F].delay(Kamon.currentSpan()), anotherExecutionContext) + innerSpanId2 <- Async[F].delay(Kamon.currentSpan()) + _ <- Async[F].delay(innerSpan.finish()) + outerSpanIdEnd <- Async[F].delay(Kamon.currentSpan().id.string) + evalOnAnotherEx <- + Spawn[F].evalOn(Async[F].sleep(Duration.Zero).flatMap(_ => getKeyF[F]()), anotherExecutionContext) + } yield { + scope.close() + withClue("before changing")(beforeChanging shouldBe "value") + withClue("on the global exec context")(evalOnGlobalRes shouldBe "value") + withClue("on a different exec context")(evalOnAnotherEx shouldBe "value") + withClue("final result")(evalOnAnotherEx shouldBe "value") + withClue("inner span should be the same on different exec")(innerSpanId1 shouldBe innerSpan) + withClue("inner span should be the same on same exec")(innerSpanId2 shouldBe innerSpan) + withClue("inner and outer should be different")(outerSpanIdBeginning should not equal innerSpan) + } + + test[IO].unsafeRunSync()(runtime) + } + "must allow complex Span topologies to be created" in { val parentSpan = Span.Remote( Scheme.Single.spanIdFactory.generate(), @@ -163,7 +349,52 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu ) Await.result(result, 100.seconds) } + + "must allow complex Span topologies to be created - F" in { + val parentSpan = Span.Remote( + Scheme.Single.spanIdFactory.generate(), + Identifier.Empty, + Trace.create(Scheme.Single.traceIdFactory.generate(), Trace.SamplingDecision.Sample) + ) + val context = Context.of(Span.Key, parentSpan) + implicit val ec = ExecutionContext.global + + /** + * test + * - nestedLevel0 + * - nestedUpToLevel2 + * - nestedUpToLevel2._2._1 + * - fiftyInParallel + */ + def test[F[_]: Async: Parallel] = for { + span <- Async[F].delay(Kamon.currentSpan()) + nestedLevel0 <- meteredWithSpanCapture("level1-A")(Async[F].sleep(100.millis)) + nestedUpToLevel2 <- + meteredWithSpanCapture("level1-B")(meteredWithSpanCapture("level2-B")(Async[F].sleep(100.millis))) + fiftyInParallel <- + (0 to 49).toList.parTraverse(i => meteredWithSpanCapture(s"operation$i")(Async[F].sleep(100.millis))) + afterCede <- meteredWithSpanCapture("cede")(Async[F].cede *> Async[F].delay(Kamon.currentSpan())) + afterEverything <- Async[F].delay(Kamon.currentSpan()) + } yield { + span.id.string should not be empty + span.id.string shouldBe nestedLevel0._1.parentId.string + span.id.string shouldBe nestedUpToLevel2._1.parentId.string + nestedUpToLevel2._1.id.string shouldBe nestedUpToLevel2._2._1.parentId.string + fiftyInParallel.map(_._1.parentId.string).toSet shouldBe Set(span.id.string) + fiftyInParallel.map(_._1.id.string).toSet should have size 50 + afterCede._1.id.string shouldBe afterCede._2.id.string // A cede should not cause the span to be lost + afterEverything.id.string shouldBe span.id.string + } + val runtime = IORuntime.global + + val result = (1 to 100).toList + .parTraverse(_ => IO.delay(Kamon.init()) *> IO.delay(Kamon.storeContext(context)) *> test[IO]) + .unsafeToFuture()(runtime) + + Await.result(result, 100.seconds) + } } + } override protected def afterEach(): Unit = { @@ -176,21 +407,25 @@ class CatsIoInstrumentationSpec extends AnyWordSpec with Matchers with ScalaFutu IO.delay(Kamon.currentContext().getTag(plain("key"))) } - private def meteredWithSpanCapture[A](operation: String)(io: IO[A]): IO[(Span, A)] = { + private def getKeyF[F[_]: Sync](): F[String] = { + Sync[F].delay(Kamon.currentContext().getTag(plain("key"))) + } + + private def meteredWithSpanCapture[F[_]: Sync: Parallel, A](operation: String)(io: F[A]): F[(Span, A)] = { Resource.make { for { - initialCtx <- IO(Kamon.currentContext()) - parentSpan <- IO(Kamon.currentSpan()) - newSpan <- IO(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) - _ <- IO(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) + initialCtx <- Sync[F].delay(Kamon.currentContext()) + parentSpan <- Sync[F].delay(Kamon.currentSpan()) + newSpan <- Sync[F].delay(Kamon.spanBuilder(operation).context(initialCtx).asChildOf(parentSpan).start()) + _ <- Sync[F].delay(Kamon.storeContext(initialCtx.withEntry(Span.Key, newSpan))) } yield (initialCtx, newSpan) } { case (initialCtx, span) => for { - _ <- IO.delay(span.finish()) - _ <- IO.delay(Kamon.storeContext(initialCtx)) + _ <- Sync[F].delay(span.finish()) + _ <- Sync[F].delay(Kamon.storeContext(initialCtx)) } yield () } - .use(_ => (IO.delay(Kamon.currentSpan()), io).parBisequence) + .use(_ => (Sync[F].delay(Kamon.currentSpan()), io).parBisequence) } }