Skip to content

Commit

Permalink
Add kamon-pekko instrumentation (#1264)
Browse files Browse the repository at this point in the history
* Add kamon-pekko module for Apache Pekko support

Replace with advice should be in correct package

* Use released 1.0.0 for Apache Pekko

* Update sbt to 1.9.2

* Update Pekko to 1.0.1

https://pekko.apache.org/docs/pekko/1.0/release-notes/index.html#1-0-1

* Change scaladoc references from Akka to Pekko
  • Loading branch information
DieBauer authored Aug 9, 2023
1 parent cebcc5a commit 2570a11
Show file tree
Hide file tree
Showing 66 changed files with 9,240 additions and 1 deletion.
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,15 @@ lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http")
),
)).dependsOn(`kamon-akka`, `kamon-testkit` % "test")


lazy val `kamon-pekko` = (project in file("instrumentation/kamon-pekko"))
.enablePlugins(JavaAgent)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings: _*)
.dependsOn(
`kamon-scala-future` % "compile",
`kamon-testkit` % "test"
)
lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
.disablePlugins(AssemblyPlugin)
Expand Down
37 changes: 37 additions & 0 deletions instrumentation/kamon-pekko/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// The Common configuration should always depend on the latest version of Pekko. All code in the Common configuration
// should be source compatible with all Pekko versions.
inConfig(Compile)(Defaults.compileSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
))

val pekkoVersion = "1.0.1"
libraryDependencies ++= { if(scalaBinaryVersion.value == "2.11") Seq.empty else Seq(
kanelaAgent,
scalatest % Test,
logbackClassic % Test,
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-remote" % pekkoVersion,
"org.apache.pekko" %% "pekko-cluster" % pekkoVersion,
"org.apache.pekko" %% "pekko-cluster-sharding" % pekkoVersion,
"org.apache.pekko" %% "pekko-protobuf" % pekkoVersion,
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test
)}

exportJars := true

/**
* Test-related settings
*/

lazy val baseTestSettings = Seq(
fork := true,
parallelExecution := false,
javaOptions := (Test / javaOptions).value,
dependencyClasspath += (Compile / packageBin).value
)

inConfig(Test)(Defaults.testSettings ++ instrumentationSettings ++ baseTestSettings ++ Seq(
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`)
))
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kamon.instrumentation.pekko.instrumentations;

import org.apache.pekko.dispatch.Envelope;
import kamon.context.Context;
import kamon.instrumentation.context.HasContext;
import kamon.instrumentation.context.HasTimestamp;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

final public class ActorCellInvokeAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(
@Advice.This Object cell,
@Advice.Argument(0) Object envelope,
@Advice.Local("stateFromStart") Object stateFromStart,
@Advice.Local("processingStartTimestamp") Long processingStartTimestamp,
@Advice.Local("envelopeTimestamp") Long envelopeTimestamp,
@Advice.Local("context") Context context) {

final ActorMonitor actorMonitor = ((HasActorMonitor) cell).actorMonitor();

processingStartTimestamp = actorMonitor.captureProcessingStartTimestamp();
context = ((HasContext) envelope).context();
envelopeTimestamp = ((HasTimestamp) envelope).timestamp();
stateFromStart = actorMonitor.onMessageProcessingStart(context, envelopeTimestamp, (Envelope) envelope);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void exit(
@Advice.This Object cell,
@Advice.Local("stateFromStart") Object stateFromStart,
@Advice.Local("processingStartTimestamp") Long processingStartTimestamp,
@Advice.Local("envelopeTimestamp") Long envelopeTimestamp,
@Advice.Local("context") Context context) {

final ActorMonitor actorMonitor = ((HasActorMonitor) cell).actorMonitor();
actorMonitor.onMessageProcessingEnd(context, envelopeTimestamp, processingStartTimestamp, stateFromStart);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package kamon.instrumentation.pekko.instrumentations;

import org.apache.pekko.actor.*;
import org.apache.pekko.dispatch.Mailbox;
import org.apache.pekko.dispatch.sysmsg.SystemMessage;
import org.apache.pekko.pattern.PromiseActorRef;
import org.apache.pekko.routing.RoutedActorCell;
import org.apache.pekko.routing.RoutedActorRef;
import scala.Option;

/**
* This class exposes access to several private[pekko] members that wouldn't be visible from the Scala codebase.
*/
public class PekkoPrivateAccess {

public static boolean isSystemMessage(Object message) {
return message instanceof SystemMessage;
}

public static boolean isPromiseActorRef(ActorRef ref) {
return ref instanceof PromiseActorRef;
}

public static boolean isInternalAndActiveActorRef(ActorRef target) {
return target != null && target instanceof InternalActorRef && !((InternalActorRef) target).isTerminated();
}

public static boolean isRoutedActorRef(ActorRef target) {
return target instanceof RoutedActorRef;
}

public static boolean isRoutedActorCell(Object cell) {
return cell instanceof RoutedActorCell;
}

public static boolean isUnstartedActorCell(Object cell) {
return cell instanceof UnstartedCell;
}

public static Class<?> unstartedActorCellClass() {
return UnstartedCell.class;
}

public static boolean isDeadLettersMailbox(Object cell, Object mailbox) {
final ActorCell actorCell = (ActorCell) cell;
return mailbox == actorCell.dispatcher().mailboxes().deadLetterMailbox();
}

public static long mailboxMessageCount(Object mailbox) {
return ((Mailbox) mailbox).numberOfMessages();
}

public static Option<Props> cellProps(Object cell) {
if(cell != null && cell instanceof Cell)
return Option.apply(((Cell) cell).props());
else
return Option.empty();
}

public static Option<Deploy> lookupDeploy(ActorPath path, ActorSystem system) {
final Deployer deployer = new Deployer(system.settings(), ((ExtendedActorSystem) system).dynamicAccess());
return deployer.lookup(path.$div("$a"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* =========================================================================================
* Copyright © 2013-2022 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.instrumentation.pekko.instrumentations;

import kamon.Kamon;
import kamon.context.Context;
import kamon.context.Storage;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class SchedulerRunnableAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void enter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
runnable = new ContextAwareRunnable(Kamon.currentContext(), runnable);
}

public static class ContextAwareRunnable implements Runnable {
private final Context context;
private final Runnable underlyingRunnable;

public ContextAwareRunnable(Context context, Runnable underlyingRunnable) {
this.context = context;
this.underlyingRunnable = underlyingRunnable;
}

@Override
public void run() {
final Storage.Scope scope = Kamon.storeContext(context);

try {
underlyingRunnable.run();
} finally {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.pekko.actor.instrumentation;

import org.apache.pekko.actor.Cell;
import org.apache.pekko.actor.UnstartedCell;
import org.apache.pekko.actor.instrumentation.CellWrapper;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class ReplaceWithAdvice {

@Advice.OnMethodEnter()
public static Cell enter(@Advice.Argument(value = 0, readOnly = false) Cell cell) {
Cell originalCell = cell;
cell = new CellWrapper(cell);
return originalCell;
}

@Advice.OnMethodExit()
public static void exit(@Advice.This UnstartedCell self, @Advice.Enter Cell originalCell) {
self.self().swapCell(originalCell);
}
}
Loading

0 comments on commit 2570a11

Please sign in to comment.