Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cats-effect-3: Attempt to handle the Dispatcher case #1347

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ lazy val `kamon-cats-io-3` = (project in file("instrumentation/kamon-cats-io-3")
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",
"org.typelevel" %% "cats-effect" % "3.3.14" % "provided",
// "org.typelevel" %% "cats-effect" % "3.3.14" % "provided",
"org.typelevel" %% "cats-effect" % "3.5.4" % "provided",
scalatest % "test",
logbackClassic % "test"
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kamon.instrumentation.cats3;

import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.Function1;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;
import scala.util.Right;

public class CleanSchedulerContextAdvice35 {
@Advice.OnMethodEnter
public static void enter(@Advice.Argument(value = 1, readOnly = false) Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> callback) {
callback = new CleanSchedulerContextAdvice35.ContextCleaningWrapper(callback, Kamon.currentContext());
}


public static class ContextCleaningWrapper implements Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> {
private final Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> runnable;
private final Context context;

public ContextCleaningWrapper(Function1<Right<Nothing$, BoxedUnit>, BoxedUnit> runnable, Context context) {
this.runnable = runnable;
this.context = context;
}

@Override
public BoxedUnit apply(Right<Nothing$, BoxedUnit> v1) {
try (Storage.Scope ignored = Kamon.storeContext(context)) {
return runnable.apply(v1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ kanela.modules {

within = [
"cats\\.effect\\.IOFiber",
"cats\\.effect\\.std\\.Dispatcher",
"cats\\.effect\\.unsafe\\.WorkStealingThreadPool",
"cats\\.effect\\.unsafe\\.SchedulerCompanionPlatform.*"
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cats.effect.kamonCats

import org.slf4j.LoggerFactory

/**
* Utility class to make accessing some internals from Kamon, more accessible.
*
*/
object PackageAccessor {

private val LOG = LoggerFactory.getLogger(getClass)

/**
* This uses reflection to get the objectState, which acts like a stack
* of effects so we can determine something about the lineage of a particular fiber.
*
* @param fiber A runnable or IOFiber
* @return
*/
def fiberObjectStackBuffer(fiber: Any): Array[AnyRef] = {
try {
val field = fiber.getClass.getDeclaredField("objectState")
field.setAccessible(true)
field.get(fiber).asInstanceOf[cats.effect.ArrayStack[AnyRef]].unsafeBuffer()
} catch {
case _: Exception =>
if (LOG.isWarnEnabled)
LOG.warn("Unable to get the object stack buffer.")
Array.empty
}

}

/** This frankly kinda isn't great, but I couldn't figure out how to do this */
def isDispatcherWorker(obj: AnyRef): Boolean = {
if (obj != null)
obj.getClass.getName.contains("Dispatcher$Worker")
else false
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package kamon.instrumentation.cats3

import cats.effect.kamonCats.PackageAccessor
import kamon.Kamon
import kamon.context.Storage.Scope
import kamon.context.Storage
import kamon.instrumentation.context.HasContext
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice
Expand All @@ -12,8 +12,9 @@ class IOFiberInstrumentation extends InstrumentationBuilder {

onType("cats.effect.IOFiber")
.mixin(classOf[HasContext.Mixin])
.mixin(classOf[HasStorage.Mixin])
.advise(isConstructor.and(takesArguments(5).or(takesArguments(3))), AfterFiberInit)
.advise(method("suspend"), SaveCurrentContextOnExit)
.advise(method("suspend"), SaveCurrentContextOnSuspend)
.advise(method("resume"), RestoreContextOnSuccessfulResume)
.advise(method("run"), RunLoopWithContext)

Expand All @@ -31,23 +32,57 @@ class IOFiberInstrumentation extends InstrumentationBuilder {
)

onTypes("cats.effect.unsafe.WorkStealingThreadPool")
.advise(method("sleepInternal"), classOf[CleanSchedulerContextAdvice35]) // > 3.3
.advise(
anyMethods(
"scheduleFiber",
"rescheduleFiber",
"scheduleFiber", // <3.4
"rescheduleFiber", // <3.4
"reschedule",
"scheduleExternal"
),
SetContextOnNewFiberForWSTP
)

// Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread,
// For < 3.4 cats, Scheduled actions like `IO.sleep` end up calling `resume` from the scheduler thread,
// which always leaves a dirty thread. This wrapper ensures that scheduled actions are
// executed with the same Context that was available when they were scheduled, and then
// reset the scheduler thread to the empty context.
onSubTypesOf("cats.effect.unsafe.Scheduler")
.advise(method("sleep"), classOf[CleanSchedulerContextAdvice])
}

/**
* Mixin that exposes access to the scope captured by an instrumented instance.
* The interface exposes means of getting and more importantly closing of the
* scope.
*/
trait HasStorage {

/**
* Returns the [[Storage.Scope]] stored in the instrumented instance.
*/
def kamonScope: Storage.Scope

/**
* Updates the [[Storage.Scope]] stored in the instrumented instance
*/
def setKamonScope(scope: Storage.Scope): Unit

}

object HasStorage {

/**
* [[HasStorage]] implementation that keeps the scope in a mutable field.
*/
class Mixin(@transient private var _scope: Storage.Scope) extends HasStorage {

override def kamonScope: Storage.Scope = if (_scope != null) _scope else Storage.Scope.Empty

override def setKamonScope(scope: Storage.Scope): Unit = _scope = scope
}
}

class AfterFiberInit
object AfterFiberInit {

Expand All @@ -61,13 +96,13 @@ class RunLoopWithContext
object RunLoopWithContext {

@Advice.OnMethodEnter()
@static def enter(@Advice.This fiber: Any): Scope = {
@static def enter(@Advice.This fiber: Any): Storage.Scope = {
val ctxFiber = fiber.asInstanceOf[HasContext].context
Kamon.storeContext(ctxFiber)
}

@Advice.OnMethodExit()
@static def exit(@Advice.Enter scope: Scope, @Advice.This fiber: Any): Unit = {
@static def exit(@Advice.Enter scope: Storage.Scope, @Advice.This fiber: Any): Unit = {
val leftContext = Kamon.currentContext()
scope.close()

Expand All @@ -80,19 +115,41 @@ object RestoreContextOnSuccessfulResume {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any, @Advice.Return wasSuspended: Boolean): Unit = {
if (wasSuspended) {
val ctxFiber = fiber.asInstanceOf[HasContext].context
Kamon.storeContext(ctxFiber)

// Resume is tricky, most of the time we want to keep the `wasSuspended` behavior,
// but there's a single issue with Dispatcher, basically it resumes differently, so
// we try to catch that case and identify it and do something different with it.
val dispatcherIsRunningDirectly =
PackageAccessor.fiberObjectStackBuffer(fiber).exists(PackageAccessor.isDispatcherWorker)

val fi = fiber.asInstanceOf[HasContext with HasStorage]

val setContext = wasSuspended && !dispatcherIsRunningDirectly
if (setContext) {
fi.setKamonScope(Kamon.storeContext(fi.context))
} else if (dispatcherIsRunningDirectly) {
fi.setContext(Kamon.currentContext())
}
}
}

class SaveCurrentContextOnExit
object SaveCurrentContextOnExit {
class SaveCurrentContextOnSuspend
object SaveCurrentContextOnSuspend {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any): Unit = {
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
val fi = fiber.asInstanceOf[HasContext with HasStorage]
fi.setContext(Kamon.currentContext())
}
}

class CleanFiberUp
object CleanFiberUp {

@Advice.OnMethodExit()
@static def exit(@Advice.This fiber: Any): Unit = {
val fi = fiber.asInstanceOf[HasContext with HasStorage]
fi.kamonScope.close()
}
}

Expand All @@ -108,7 +165,6 @@ class SetContextOnNewFiberForWSTP
object SetContextOnNewFiberForWSTP {

@Advice.OnMethodEnter()
@static def enter(@Advice.Argument(0) fiber: Any): Unit = {
@static def enter(@Advice.Argument(0) fiber: Any): Unit =
fiber.asInstanceOf[HasContext].setContext(Kamon.currentContext())
}
}
Loading
Loading