Skip to content

Commit

Permalink
514 - bring back mailbox size metric
Browse files Browse the repository at this point in the history
  • Loading branch information
skipper1982 authored and lgajowy committed Feb 15, 2023
1 parent 5d8d8d4 commit 176899f
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.{ BeforeAndAfterEach, Inspectors, OptionValues }

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.control.NoStackTrace

final class AkkaActorTest
Expand Down Expand Up @@ -194,6 +195,31 @@ final class AkkaActorTest
}("", "work", "work")(messages -> check)
}

it should "record mailbox size properly" in {
val processingTime = 200
val actor = system.classicSystem.actorOf(SuspendActor.props(processingTime), createUniqueId)

def expectMailboxSize(run: Int, size: Int): Unit =
assertMetric("mesmer_akka_actor_mailbox_size") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
.contains(actor.path.toStringWithoutAddress)
)

points.map(_.getValue) should contain(size)
}
val runId = Random.nextInt()
actor ! Message
actor ! Message
expectMailboxSize(runId, 1)
actor ! Message
expectMailboxSize(runId, 2)
Thread.sleep(processingTime)
expectMailboxSize(runId, 1)

}

it should "record stash operation from actors beginning" in {
val stashActor = system.classicSystem.actorOf(ClassicStashActor.props(), createUniqueId)

Expand Down Expand Up @@ -449,6 +475,16 @@ object AkkaActorAgentTest {
// replies
final case class StashSize(stash: Option[Long])

object SuspendActor {
def props(processingTime: Long): Props = Props(new SuspendActor(processingTime))
}

class SuspendActor(processingTime: Long) extends classic.Actor {
def receive: Receive = {
case Message => Thread.sleep(processingTime)
}
}

object ClassicStashActor {
def props(): Props = Props(new ClassicStashActor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public static void exit(@Advice.Return Envelope envelope, @Advice.This Mailbox s
Instruments instruments = InstrumentsProvider.instance();

if (Objects.nonNull(context) && Objects.nonNull(attrs)) {

long interval = new Interval(System.nanoTime() - context.sentTime()).toMillis();

instruments.mailboxTime().record(interval, attrs);
instruments.mailboxSize().add(-1, attrs);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package akka.actor.impl;

import akka.actor.ActorContext;
import akka.dispatch.Mailbox;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.scalac.mesmer.otelextension.instrumentations.akka.actor.Instruments;
import io.scalac.mesmer.otelextension.instrumentations.akka.actor.InstrumentsProvider;
import net.bytebuddy.asm.Advice;

import java.util.Objects;

public class MailboxEnqueueAdvice {

@Advice.OnMethodExit
public static void exit(@Advice.This Mailbox self) {
if (Objects.nonNull(self.actor())
&& Objects.nonNull(self.actor().getSystem())) {
Attributes attrs = VirtualField.find(ActorContext.class, Attributes.class).get(self.actor());
Instruments instruments = InstrumentsProvider.instance();

if (Objects.nonNull(attrs)) {
instruments.mailboxSize().add(1, attrs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
AkkaActorAgent.actorCellInit(),
AkkaActorAgent.dispatchSendMessage(),
AkkaActorAgent.mailboxDequeue(),
AkkaActorAgent.mailboxEnqueue(),
AkkaActorAgent.classicStashSupportStashAdvice(),
AkkaActorAgent.classicStashSupportPrependAdvice(),
AkkaActorAgent.typedStashBufferAdvice(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ object AkkaActorAgent {
)
)

val mailboxEnqueue: TypeInstrumentation = Instrumentation(named("akka.dispatch.Mailbox"))
.`with`(
Advice(
named("enqueue"),
"akka.actor.impl.MailboxEnqueueAdvice"
)
)


val classicStashSupportStashAdvice: TypeInstrumentation =
Instrumentation(named("akka.actor.StashSupport"))
.`with`(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.scalac.mesmer.otelextension.instrumentations.akka.actor
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.metrics.LongCounter
import io.opentelemetry.api.metrics.LongHistogram
import io.opentelemetry.api.metrics.MeterProvider
import io.opentelemetry.api.metrics.{LongCounter, LongHistogram, LongUpDownCounter, MeterProvider}

trait Instruments {

Expand All @@ -16,6 +14,8 @@ trait Instruments {

def mailboxTime: LongHistogram

def mailboxSize: LongUpDownCounter

def stashedMessages: LongCounter

def sentMessages: LongCounter
Expand Down Expand Up @@ -55,6 +55,11 @@ object Instruments {
.ofLongs()
.build()

lazy val mailboxSize: LongUpDownCounter = provider
.get("mesmer")
.upDownCounterBuilder("mesmer_akka_actor_mailbox_size")
.build()

lazy val stashedMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_actor_stashed_messages_total")
Expand Down

0 comments on commit 176899f

Please sign in to comment.