diff --git a/otel-extension/src/it/scala/io/scalac/mesmer/instrumentation/akka/actor/otel/AkkaActorTest.scala b/otel-extension/src/it/scala/io/scalac/mesmer/instrumentation/akka/actor/otel/AkkaActorTest.scala index aa6b1ed05..31326b5f7 100644 --- a/otel-extension/src/it/scala/io/scalac/mesmer/instrumentation/akka/actor/otel/AkkaActorTest.scala +++ b/otel-extension/src/it/scala/io/scalac/mesmer/instrumentation/akka/actor/otel/AkkaActorTest.scala @@ -20,6 +20,7 @@ import org.scalatest.{ BeforeAndAfterEach, Inspectors, OptionValues } import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.util.Random import scala.util.control.NoStackTrace final class AkkaActorTest @@ -194,6 +195,31 @@ final class AkkaActorTest }("", "work", "work")(messages -> check) } + it should "record mailbox size properly" in { + val processingTime = 200 + val actor = system.classicSystem.actorOf(SuspendActor.props(processingTime), createUniqueId) + + def expectMailboxSize(run: Int, size: Int): Unit = + assertMetric("mesmer_akka_actor_mailbox_size") { data => + val points = data.getLongSumData.getPoints.asScala + .filter(point => + Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath))) + .contains(actor.path.toStringWithoutAddress) + ) + + points.map(_.getValue) should contain(size) + } + val runId = Random.nextInt() + actor ! Message + actor ! Message + expectMailboxSize(runId, 1) + actor ! Message + expectMailboxSize(runId, 2) + Thread.sleep(processingTime) + expectMailboxSize(runId, 1) + + } + it should "record stash operation from actors beginning" in { val stashActor = system.classicSystem.actorOf(ClassicStashActor.props(), createUniqueId) @@ -449,6 +475,16 @@ object AkkaActorAgentTest { // replies final case class StashSize(stash: Option[Long]) + object SuspendActor { + def props(processingTime: Long): Props = Props(new SuspendActor(processingTime)) + } + + class SuspendActor(processingTime: Long) extends classic.Actor { + def receive: Receive = { + case Message => Thread.sleep(processingTime) + } + } + object ClassicStashActor { def props(): Props = Props(new ClassicStashActor) } diff --git a/otel-extension/src/main/java/akka/actor/impl/MailboxDequeueAdvice.java b/otel-extension/src/main/java/akka/actor/impl/MailboxDequeueAdvice.java index 3e4bfc070..d82a2dc99 100644 --- a/otel-extension/src/main/java/akka/actor/impl/MailboxDequeueAdvice.java +++ b/otel-extension/src/main/java/akka/actor/impl/MailboxDequeueAdvice.java @@ -25,9 +25,10 @@ public static void exit(@Advice.Return Envelope envelope, @Advice.This Mailbox s Instruments instruments = InstrumentsProvider.instance(); if (Objects.nonNull(context) && Objects.nonNull(attrs)) { - long interval = new Interval(System.nanoTime() - context.sentTime()).toMillis(); + instruments.mailboxTime().record(interval, attrs); + instruments.mailboxSize().add(-1, attrs); } } } diff --git a/otel-extension/src/main/java/akka/actor/impl/MailboxEnqueueAdvice.java b/otel-extension/src/main/java/akka/actor/impl/MailboxEnqueueAdvice.java new file mode 100644 index 000000000..d1b8ac34a --- /dev/null +++ b/otel-extension/src/main/java/akka/actor/impl/MailboxEnqueueAdvice.java @@ -0,0 +1,27 @@ +package akka.actor.impl; + +import akka.actor.ActorContext; +import akka.dispatch.Mailbox; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.api.field.VirtualField; +import io.scalac.mesmer.otelextension.instrumentations.akka.actor.Instruments; +import io.scalac.mesmer.otelextension.instrumentations.akka.actor.InstrumentsProvider; +import net.bytebuddy.asm.Advice; + +import java.util.Objects; + +public class MailboxEnqueueAdvice { + + @Advice.OnMethodExit + public static void exit(@Advice.This Mailbox self) { + if (Objects.nonNull(self.actor()) + && Objects.nonNull(self.actor().getSystem())) { + Attributes attrs = VirtualField.find(ActorContext.class, Attributes.class).get(self.actor()); + Instruments instruments = InstrumentsProvider.instance(); + + if (Objects.nonNull(attrs)) { + instruments.mailboxSize().add(1, attrs); + } + } + } +} diff --git a/otel-extension/src/main/java/io/scalac/mesmer/otelextension/akka/MesmerAkkaActorInstrumentationModule.java b/otel-extension/src/main/java/io/scalac/mesmer/otelextension/akka/MesmerAkkaActorInstrumentationModule.java index 7780e0f31..d661db48a 100644 --- a/otel-extension/src/main/java/io/scalac/mesmer/otelextension/akka/MesmerAkkaActorInstrumentationModule.java +++ b/otel-extension/src/main/java/io/scalac/mesmer/otelextension/akka/MesmerAkkaActorInstrumentationModule.java @@ -27,6 +27,7 @@ public List typeInstrumentations() { AkkaActorAgent.actorCellInit(), AkkaActorAgent.dispatchSendMessage(), AkkaActorAgent.mailboxDequeue(), + AkkaActorAgent.mailboxEnqueue(), AkkaActorAgent.classicStashSupportStashAdvice(), AkkaActorAgent.classicStashSupportPrependAdvice(), AkkaActorAgent.typedStashBufferAdvice(), diff --git a/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/AkkaActorAgent.scala b/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/AkkaActorAgent.scala index c179957a1..943bfd338 100644 --- a/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/AkkaActorAgent.scala +++ b/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/AkkaActorAgent.scala @@ -41,6 +41,15 @@ object AkkaActorAgent { ) ) + val mailboxEnqueue: TypeInstrumentation = Instrumentation(named("akka.dispatch.Mailbox")) + .`with`( + Advice( + named("enqueue"), + "akka.actor.impl.MailboxEnqueueAdvice" + ) + ) + + val classicStashSupportStashAdvice: TypeInstrumentation = Instrumentation(named("akka.actor.StashSupport")) .`with`( diff --git a/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/Instruments.scala b/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/Instruments.scala index 27ea59ac3..7ddaec60d 100644 --- a/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/Instruments.scala +++ b/otel-extension/src/main/scala/io/scalac/mesmer/otelextension/instrumentations/akka/actor/Instruments.scala @@ -1,8 +1,6 @@ package io.scalac.mesmer.otelextension.instrumentations.akka.actor import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.metrics.LongCounter -import io.opentelemetry.api.metrics.LongHistogram -import io.opentelemetry.api.metrics.MeterProvider +import io.opentelemetry.api.metrics.{LongCounter, LongHistogram, LongUpDownCounter, MeterProvider} trait Instruments { @@ -16,6 +14,8 @@ trait Instruments { def mailboxTime: LongHistogram + def mailboxSize: LongUpDownCounter + def stashedMessages: LongCounter def sentMessages: LongCounter @@ -55,6 +55,11 @@ object Instruments { .ofLongs() .build() + lazy val mailboxSize: LongUpDownCounter = provider + .get("mesmer") + .upDownCounterBuilder("mesmer_akka_actor_mailbox_size") + .build() + lazy val stashedMessages: LongCounter = provider .get("mesmer") .counterBuilder("mesmer_akka_actor_stashed_messages_total")