Skip to content

Commit

Permalink
Fix akka-http instrumentation for HTTP2
Browse files Browse the repository at this point in the history
  • Loading branch information
psnep committed Oct 29, 2024
1 parent f4898d9 commit 6e938a8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 77 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kamon.instrumentation.akka.http;

import akka.NotUsed;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class Http2BlueprintAsyncAdvice {

public static class EndpointInfo {
public final String listenInterface;
public final int listenPort;

public EndpointInfo(String listenInterface, int listenPort) {
this.listenInterface = listenInterface;
this.listenPort = listenPort;
}
}

public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();

@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) Flow<HttpRequest, HttpResponse, NotUsed> returnedFlow) {
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();

if(bindAndHandlerEndpoint != null) {
returnedFlow = ServerFlowWrapper.apply(
returnedFlow,
bindAndHandlerEndpoint.listenInterface,
bindAndHandlerEndpoint.listenPort
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@

package kamon.instrumentation.akka.http;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.Function1;
import scala.concurrent.Future;

public class Http2ExtBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(1) String iface,
@Advice.Argument(2) Integer port) {
public static void onEnter(@Advice.Argument(1) String iface, @Advice.Argument(2) Integer port) {

FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
Http2BlueprintAsyncAdvice.currentEndpoint.set(new Http2BlueprintAsyncAdvice.EndpointInfo(iface, port));
}

@Advice.OnMethodExit
public static void onExit() {
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
Http2BlueprintAsyncAdvice.currentEndpoint.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*/
onType("akka.http.scaladsl.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -306,6 +306,7 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}


object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
Expand All @@ -315,10 +316,8 @@ object Http2BlueprintInterceptor {
}

@RuntimeType
def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {
def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*/

onType("akka.http.impl.engine.http2.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -329,27 +329,3 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}
}

object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
extends (HttpRequest => Future[HttpResponse]) {

override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
}

@RuntimeType
def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
ServerFlowWrapper(zuper.call(), interface, port)

case _ =>
zuper.call()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@ package kamon.instrumentation.akka.http
import java.util.concurrent.Callable
import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller}
import akka.http.scaladsl.model.StatusCodes.Redirection
import akka.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri}
import akka.http.scaladsl.model.{HttpHeader, StatusCode, Uri}
import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
import akka.http.scaladsl.server.directives.RouteDirectives.reject
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.util.{Tuple, Tupler}
import akka.http.scaladsl.util.FastFuture
import kamon.Kamon
import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext
import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext}
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.api.instrumentation.mixin.Initializer
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._

import scala.concurrent.{Batchable, ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import java.util.regex.Pattern
import akka.NotUsed
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

Expand All @@ -46,15 +41,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*
*/
onType("akka.http.impl.engine.http2.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor])
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -314,28 +309,3 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}
}

class Http2BlueprintInterceptor
object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
extends (HttpRequest => Future[HttpResponse]) {

override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
}

@RuntimeType
@static def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
ServerFlowWrapper(zuper.call(), interface, port)

case _ =>
zuper.call()
}
}
}

0 comments on commit 6e938a8

Please sign in to comment.