From f4fba080f50917d2c8839387bcc588348af5bba6 Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Thu, 21 Nov 2024 12:35:07 +0100 Subject: [PATCH] feat: Improved custom tracing span API (#10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Improved custom tracing span API * Drop the tracing context, provide access to parent span if needed, update docs * Improved javadoc * review feedback --------- Co-authored-by: Johan Andrén --- .../impl/TestKitCommandContextTimed.scala | 7 +- ...tKitEventSourcedEntityCommandContext.scala | 3 + .../TestKitEventSourcedEntityContext.scala | 2 +- ...estKitEventSourcedEntityEventContext.scala | 2 +- .../TestKitKeyValueEntityCommandContext.scala | 5 +- .../impl/TestKitKeyValueEntityContext.scala | 2 +- .../javasdk/testkit/impl/TestKitTracing.scala | 20 ++++ .../components/tracing/Batches.java | 27 ------ .../components/tracing/Traces.java | 13 --- .../src/main/java/akka/javasdk/Metadata.java | 7 -- .../main/java/akka/javasdk/TraceContext.java | 48 --------- .../src/main/java/akka/javasdk/Tracing.java | 38 ++++++++ .../akka/javasdk/consumer/MessageContext.java | 10 +- .../eventsourcedentity/CommandContext.java | 4 + .../akka/javasdk/http/RequestContext.java | 6 +- .../keyvalueentity/CommandContext.java | 4 + .../javasdk/timedaction/CommandContext.java | 10 +- .../akka/javasdk/workflow/CommandContext.java | 4 + .../akka/javasdk/impl/MetadataImpl.scala | 19 ++-- .../scala/akka/javasdk/impl/SdkRunner.scala | 24 +++-- .../javasdk/impl/action/ActionsImpl.scala | 29 +++--- .../javasdk/impl/consumer/ConsumersImpl.scala | 11 ++- .../EventSourcedEntitiesImpl.scala | 15 ++- .../keyvalueentity/KeyValueEntitiesImpl.scala | 20 ++-- .../impl/telemetry/SpanTracingImpl.scala | 31 ++++++ .../javasdk/impl/telemetry/Telemetry.scala | 10 +- .../akka/javasdk/impl/view/ViewsImpl.scala | 4 +- .../javasdk/impl/workflow/WorkflowImpl.scala | 97 ++++++++++++------- .../akka/javasdk/timedaction/TestTracing.java | 20 +++- .../akka/javasdk/impl/ConsumersImplSpec.scala | 6 +- .../impl/action/TimedActionHandlerSpec.scala | 2 +- .../pages/setup-and-dependency-injection.adoc | 2 +- samples/tracing/README.md | 27 +++--- samples/tracing/docker-compose.yml | 1 - .../example/tracing/api/TracingEndpoint.java | 48 ++++++++- .../tracing/application/TracingAction.java | 56 ++++++----- .../{domain => application}/Typicode.java | 40 ++++++-- 37 files changed, 404 insertions(+), 270 deletions(-) create mode 100644 akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitTracing.scala delete mode 100644 akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Batches.java delete mode 100644 akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Traces.java delete mode 100644 akka-javasdk/src/main/java/akka/javasdk/TraceContext.java create mode 100644 akka-javasdk/src/main/java/akka/javasdk/Tracing.java create mode 100644 akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/SpanTracingImpl.scala rename samples/tracing/src/main/java/com/example/tracing/{domain => application}/Typicode.java (52%) diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitCommandContextTimed.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitCommandContextTimed.scala index 5a6c16d81..da3c84596 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitCommandContextTimed.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitCommandContextTimed.scala @@ -5,14 +5,13 @@ package akka.javasdk.testkit.impl import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.impl.InternalContext import akka.javasdk.testkit.MockRegistry import akka.javasdk.timedaction.CommandContext -import io.opentelemetry.api.OpenTelemetry -import io.opentelemetry.api.trace.Tracer /** - * INTERNAL API Used by the generated testkit + * INTERNAL API Used by the testkit */ final class TestKitCommandContextTimed(metadata: Metadata, mockRegistry: MockRegistry = MockRegistry.EMPTY) extends AbstractTestKitContext(mockRegistry) @@ -29,5 +28,5 @@ final class TestKitCommandContextTimed(metadata: Metadata, mockRegistry: MockReg override def metadata() = metadata - override def getTracer: Tracer = OpenTelemetry.noop().getTracer("noop") + override def tracing(): Tracing = TestKitTracing } diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala index 6bc39376c..581ba95f9 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityCommandContext.scala @@ -5,6 +5,7 @@ package akka.javasdk.testkit.impl import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.eventsourcedentity.CommandContext import akka.javasdk.impl.InternalContext @@ -22,6 +23,8 @@ final class TestKitEventSourcedEntityCommandContext( this(metadata = metadata, commandName = "stubCommandName") } + override def tracing(): Tracing = TestKitTracing + } object TestKitEventSourcedEntityCommandContext { diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityContext.scala index 0a95ec8c2..b6d43b559 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityContext.scala @@ -8,7 +8,7 @@ import akka.javasdk.eventsourcedentity.EventSourcedEntityContext import akka.javasdk.testkit.MockRegistry /** - * INTERNAL API Used by the generated testkit + * INTERNAL API Used by the testkit */ final class TestKitEventSourcedEntityContext( override val entityId: String, diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityEventContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityEventContext.scala index e4166b9b0..e0e9d70f7 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityEventContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitEventSourcedEntityEventContext.scala @@ -7,7 +7,7 @@ package akka.javasdk.testkit.impl import akka.javasdk.eventsourcedentity.EventContext /** - * INTERNAL API Used by the generated testkit + * INTERNAL API Used by the testkit */ final class TestKitEventSourcedEntityEventContext extends EventContext { override def entityId = "testkit-entity-id" diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityCommandContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityCommandContext.scala index 64c959145..79d02f311 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityCommandContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityCommandContext.scala @@ -5,12 +5,13 @@ package akka.javasdk.testkit.impl import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.keyvalueentity.CommandContext import akka.javasdk.keyvalueentity.KeyValueEntityContext import akka.javasdk.testkit.MockRegistry /** - * INTERNAL API Used by the generated testkit + * INTERNAL API Used by the testkit */ final class TestKitKeyValueEntityCommandContext( override val entityId: String, @@ -26,4 +27,6 @@ final class TestKitKeyValueEntityCommandContext( this(entityId = entityId, metadata = metadata, commandName = "stubCommandName") } + override def tracing(): Tracing = TestKitTracing + } diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityContext.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityContext.scala index f7809c278..08e82e02d 100644 --- a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityContext.scala +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitKeyValueEntityContext.scala @@ -8,7 +8,7 @@ import akka.javasdk.keyvalueentity.KeyValueEntityContext import akka.javasdk.testkit.MockRegistry /** - * INTERNAL API Used by the generated testkit + * INTERNAL API Used by the testkit */ final class TestKitKeyValueEntityContext(override val entityId: String, mockRegistry: MockRegistry = MockRegistry.EMPTY) extends AbstractTestKitContext(mockRegistry) diff --git a/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitTracing.scala b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitTracing.scala new file mode 100644 index 000000000..0e9dd265a --- /dev/null +++ b/akka-javasdk-testkit/src/main/scala/akka/javasdk/testkit/impl/TestKitTracing.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.testkit.impl + +import akka.javasdk.Tracing +import io.opentelemetry.api.trace.Span + +import java.util.Optional + +/** + * INTERNAL API + */ +object TestKitTracing extends Tracing { + + override def startSpan(name: String): Optional[Span] = Optional.empty() + + override def parentSpan(): Optional[Span] = Optional.empty() +} diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Batches.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Batches.java deleted file mode 100644 index 10cceae8c..000000000 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Batches.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akkajavasdk.components.tracing; - -import java.util.List; - -public record Batches(List batches ){ - - public record Batch(Resource resource, List scopeSpans){ - public record Resource(List attributes){} - - public record ScopeSpan(Scope scope, List spans){ - public record Scope(String name){} - public record Span(String traceId, String spanId, String name, String kind, List attributes){} - } - } - - public record Attribute(String key, Value value) { - public record Value(String stringValue) {} - } - -} - - - diff --git a/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Traces.java b/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Traces.java deleted file mode 100644 index 6a182828b..000000000 --- a/akka-javasdk-tests/src/test/java/akkajavasdk/components/tracing/Traces.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akkajavasdk.components.tracing; - - -import java.util.List; - -public record Traces(List traces) { - public record Trace(String traceID, String rootServiceName, String rootTraceName) { - } -} diff --git a/akka-javasdk/src/main/java/akka/javasdk/Metadata.java b/akka-javasdk/src/main/java/akka/javasdk/Metadata.java index 89f5170c1..6ff6ba981 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/Metadata.java +++ b/akka-javasdk/src/main/java/akka/javasdk/Metadata.java @@ -213,13 +213,6 @@ public interface Metadata extends Iterable { */ CloudEvent asCloudEvent(String id, URI source, String type); - - /** - * Get the trace context associated with this request metadata. - * @return The trace context. - */ - TraceContext traceContext(); - /** * Merge the given Metadata entries with this Metadata. If the same key is present in both, both values will be kept. * diff --git a/akka-javasdk/src/main/java/akka/javasdk/TraceContext.java b/akka-javasdk/src/main/java/akka/javasdk/TraceContext.java deleted file mode 100644 index 20175205a..000000000 --- a/akka-javasdk/src/main/java/akka/javasdk/TraceContext.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) 2021-2024 Lightbend Inc. - */ - -package akka.javasdk; - -import io.opentelemetry.context.Context; - -import java.util.Optional; - -/** Utility interface for trace context helper methods. */ -public interface TraceContext { - - /** - * Allows retrieving the trace context as an OpenTelemetry context for easier construction of - * child spans. If the trace context is not available, a new empty context will be returned. - * - * @return the trace context as an OpenTelemetry context. - */ - Context asOpenTelemetryContext(); - - /** - * Allows retrieving the trace parent for easier injection in external calls (e.g. HTTP request - * headers). - * - * @return the trace parent using W3C Trace Context format. - * @see W3C Trace - * Context section 3 - */ - Optional traceParent(); - - /** - * Allows retrieving the trace state for easier injection in external calls (e.g. HTTP request - * headers). - * - * @return the trace state using W3C Trace Context format. - * @see W3C Trace - * Context section 3 - */ - Optional traceState(); - - /** - * Allows retrieving the trace id of the trace parent if any. - * - * @return the traceId of the traceParent if any - */ - Optional traceId(); -} diff --git a/akka-javasdk/src/main/java/akka/javasdk/Tracing.java b/akka-javasdk/src/main/java/akka/javasdk/Tracing.java new file mode 100644 index 000000000..bb7d1e027 --- /dev/null +++ b/akka-javasdk/src/main/java/akka/javasdk/Tracing.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk; + +import akka.annotation.DoNotInherit; +import io.opentelemetry.api.trace.Span; + +import java.util.Optional; + +/** + * Factory for manually creating open telemetry spans in addition to those automatically provided by + * the runtime and SDK. + * + *

Not for user extension. Injectable into endpoint constructors or available through component + * command contexts. + */ +@DoNotInherit +public interface Tracing { + /** + * If tracing is enabled, create and start a new custom span with the given name, setting a parent + * for the span is done automatically so that the span is a child of the incoming request or + * component call. + * + * @return Optional of the span if tracing is enabled, empty option if tracing is not enabled. + */ + Optional startSpan(String name); + + /** + * If tracing is enabled, this returns the current parent span, to use for propagating trace + * parent through third party integrations. This span should only be used for observing, ending it + * or marking it as failed etc. is managed by the SDK and the runtime. + * + * @see {{@link #startSpan(String)}} for creating a custom span tied to some logic in a service. + */ + Optional parentSpan(); +} diff --git a/akka-javasdk/src/main/java/akka/javasdk/consumer/MessageContext.java b/akka-javasdk/src/main/java/akka/javasdk/consumer/MessageContext.java index c1a9e3be6..56223d348 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/consumer/MessageContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/consumer/MessageContext.java @@ -6,6 +6,7 @@ import akka.javasdk.CloudEvent; import akka.javasdk.MetadataContext; +import akka.javasdk.Tracing; import io.opentelemetry.api.trace.Tracer; import java.util.Optional; @@ -19,11 +20,6 @@ public interface MessageContext extends MetadataContext { */ Optional eventSubject(); - /** - * Get an OpenTelemetry tracer for the current message. This will allow for building and automatic - * exporting of spans. - * - * @return A tracer for the current message, if tracing is configured. Otherwise, a noops tracer. - */ - Tracer getTracer(); + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java index 22eebf9d8..325596d29 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/eventsourcedentity/CommandContext.java @@ -5,6 +5,7 @@ package akka.javasdk.eventsourcedentity; import akka.javasdk.MetadataContext; +import akka.javasdk.Tracing; /** An event sourced command context. */ public interface CommandContext extends MetadataContext { @@ -35,4 +36,7 @@ public interface CommandContext extends MetadataContext { * @return The entity id. */ String entityId(); + + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/http/RequestContext.java b/akka-javasdk/src/main/java/akka/javasdk/http/RequestContext.java index 96bf41b2b..0a1b2bf39 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/http/RequestContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/http/RequestContext.java @@ -8,8 +8,7 @@ import akka.javasdk.Context; import akka.javasdk.JwtClaims; import akka.javasdk.Principals; - -import java.util.Optional; +import akka.javasdk.Tracing; /** * Not for user extension, can be injected as constructor parameter into HTTP endpoint components @@ -26,4 +25,7 @@ public interface RequestContext extends Context { /** @return The JWT claims, if any, associated with this request. */ JwtClaims getJwtClaims(); + + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/CommandContext.java index 38224a42f..dda78c979 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/keyvalueentity/CommandContext.java @@ -5,6 +5,7 @@ package akka.javasdk.keyvalueentity; import akka.javasdk.MetadataContext; +import akka.javasdk.Tracing; /** A value based entity command context. */ public interface CommandContext extends MetadataContext { @@ -29,4 +30,7 @@ public interface CommandContext extends MetadataContext { * @return The entity id. */ String entityId(); + + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/timedaction/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/timedaction/CommandContext.java index f90cebbb7..ca24eab10 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/timedaction/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/timedaction/CommandContext.java @@ -5,16 +5,12 @@ package akka.javasdk.timedaction; import akka.javasdk.MetadataContext; +import akka.javasdk.Tracing; import io.opentelemetry.api.trace.Tracer; /** Context for action calls. */ public interface CommandContext extends MetadataContext { - /** - * Get an OpenTelemetry tracer for the current action. This will allow for building and automatic - * exporting of spans. - * - * @return A tracer for the current action, if tracing is configured. Otherwise, a noops tracer. - */ - Tracer getTracer(); + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/java/akka/javasdk/workflow/CommandContext.java b/akka-javasdk/src/main/java/akka/javasdk/workflow/CommandContext.java index a10412faf..018cefbb1 100644 --- a/akka-javasdk/src/main/java/akka/javasdk/workflow/CommandContext.java +++ b/akka-javasdk/src/main/java/akka/javasdk/workflow/CommandContext.java @@ -5,6 +5,7 @@ package akka.javasdk.workflow; import akka.javasdk.MetadataContext; +import akka.javasdk.Tracing; /** A value based workflow command context. */ public interface CommandContext extends MetadataContext { @@ -29,4 +30,7 @@ public interface CommandContext extends MetadataContext { * @return The workflow id. */ String workflowId(); + + /** Access to tracing for custom app specific tracing. */ + Tracing tracing(); } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala index 5db3bf450..8f25d98cc 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/MetadataImpl.scala @@ -18,7 +18,6 @@ import scala.jdk.OptionConverters._ import akka.annotation.InternalApi import akka.javasdk.CloudEvent import akka.javasdk.Metadata -import akka.javasdk.TraceContext import akka.javasdk.impl.telemetry.Telemetry import akka.javasdk.impl.telemetry.Telemetry.metadataGetter import com.google.protobuf.ByteString @@ -207,22 +206,16 @@ private[javasdk] class MetadataImpl private (val entries: Seq[MetadataEntry]) ex override def asMetadata(): Metadata = this - override lazy val traceContext: TraceContext = new TraceContext { - override def asOpenTelemetryContext(): OtelContext = W3CTraceContextPropagator + lazy val traceId: Option[String] = { + val otelContext = W3CTraceContextPropagator .getInstance() .extract(OtelContext.current(), asMetadata(), metadataGetter) - override def traceId(): Optional[String] = { - Span.fromContext(asOpenTelemetryContext()).getSpanContext.getTraceId match { - case "00000000000000000000000000000000" => - Optional.empty() // when no traceId returns io.opentelemetry.api.trace.TraceId.INVALID - case traceId => Some(traceId).toJava - } + Span.fromContext(otelContext).getSpanContext.getTraceId match { + case "00000000000000000000000000000000" => + None // when no traceId returns io.opentelemetry.api.trace.TraceId.INVALID + case traceId => Some(traceId) } - - override def traceParent(): Optional[String] = getScala(Telemetry.TRACE_PARENT_KEY).toJava - - override def traceState(): Optional[String] = getScala(Telemetry.TRACE_STATE_KEY).toJava } override def merge(other: Metadata): Metadata = { diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala index 3ae544a27..077cebb25 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala @@ -58,7 +58,10 @@ import akka.javasdk.view.View import akka.javasdk.workflow.Workflow import akka.javasdk.workflow.WorkflowContext import akka.javasdk.JwtClaims +import akka.javasdk.Tracing import akka.javasdk.impl.http.JwtClaimsImpl +import akka.javasdk.impl.telemetry.SpanTracingImpl +import akka.javasdk.impl.telemetry.TraceInstrumentation import akka.runtime.sdk.spi.ComponentClients import akka.runtime.sdk.spi.HttpEndpointConstructionContext import akka.runtime.sdk.spi.HttpEndpointDescriptor @@ -75,7 +78,7 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer -import io.opentelemetry.context.Context +import io.opentelemetry.context.{ Context => OtelContext } import kalix.protocol.action.Actions import kalix.protocol.discovery.Discovery import kalix.protocol.event_sourced_entity.EventSourcedEntities @@ -267,6 +270,8 @@ private final class Sdk( private val applicationConfig = ApplicationConfig(system).getConfig private val sdkSettings = Settings(applicationConfig.getConfig("akka.javasdk")) + private val sdkTracerFactory = () => tracerFactory(TraceInstrumentation.InstrumentationScopeName) + private val httpClientProvider = new HttpClientProviderImpl( system, None, @@ -351,7 +356,6 @@ private final class Sdk( case h if h == classOf[HttpClientProvider] => httpClientProvider(span) case t if t == classOf[TimerScheduler] => timerScheduler(span) case m if m == classOf[Materializer] => sdkMaterializer - case s if s == classOf[Span] => span.getOrElse(Span.current()) } // FIXME mixing runtime config with sdk with user project config is tricky @@ -380,7 +384,7 @@ private final class Sdk( actionAndConsumerServices, runtimeComponentClients.timerClient, sdkExecutionContext, - tracerFactory)) + sdkTracerFactory)) } services.groupBy(_._2.getClass).foreach { @@ -393,23 +397,23 @@ private final class Sdk( eventSourcedServices, sdkSettings, sdkDispatcherName, - tracerFactory) + sdkTracerFactory) eventSourcedEntitiesEndpoint = Some(eventSourcedImpl) case (serviceClass, entityServices: Map[String, KeyValueEntityService[_, _]] @unchecked) if serviceClass == classOf[KeyValueEntityService[_, _]] => - valueEntitiesEndpoint = - Some(new KeyValueEntitiesImpl(classicSystem, entityServices, sdkSettings, sdkDispatcherName, tracerFactory)) + valueEntitiesEndpoint = Some( + new KeyValueEntitiesImpl(classicSystem, entityServices, sdkSettings, sdkDispatcherName, sdkTracerFactory)) case (serviceClass, workflowServices: Map[String, WorkflowService[_, _]] @unchecked) if serviceClass == classOf[WorkflowService[_, _]] => workflowEntitiesEndpoint = Some( new WorkflowImpl( - classicSystem, workflowServices, runtimeComponentClients.timerClient, sdkExecutionContext, - sdkDispatcherName)) + sdkDispatcherName, + sdkTracerFactory)) case (serviceClass, _: Map[String, TimedActionService[_]] @unchecked) if serviceClass == classOf[TimedActionService[_]] => @@ -570,6 +574,8 @@ private final class Sdk( throw new RuntimeException( "There are no JWT claims defined but trying accessing the JWT claims. The class or the method needs to be annotated with @JWT.") } + + override def tracing(): Tracing = new SpanTracingImpl(context.openTelemetrySpan, sdkTracerFactory) } } } @@ -652,7 +658,7 @@ private final class Sdk( private def httpClientProvider(openTelemetrySpan: Option[Span]): HttpClientProvider = openTelemetrySpan match { case None => httpClientProvider - case Some(span) => httpClientProvider.withTraceContext(Context.current().`with`(span)) + case Some(span) => httpClientProvider.withTraceContext(OtelContext.current().`with`(span)) } } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/action/ActionsImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/action/ActionsImpl.scala index 87bfc14e4..fd9148f31 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/action/ActionsImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/action/ActionsImpl.scala @@ -9,6 +9,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.annotation.InternalApi import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.consumer.Consumer import akka.javasdk.consumer.MessageContext import akka.javasdk.consumer.MessageEnvelope @@ -24,6 +25,7 @@ import akka.javasdk.impl.telemetry.ActionCategory import akka.javasdk.impl.telemetry.ConsumerCategory import akka.javasdk.impl.telemetry.Telemetry import akka.javasdk.impl.telemetry.TraceInstrumentation +import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.timedaction.TimedActionService import akka.javasdk.impl.timer.TimerSchedulerImpl import akka.javasdk.timedaction.CommandContext @@ -33,7 +35,7 @@ import akka.javasdk.timer.TimerScheduler import akka.runtime.sdk.spi.TimerClient import akka.stream.scaladsl.Source import io.grpc.Status -import io.opentelemetry.api.trace.SpanContext +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer import kalix.protocol.action.ActionCommand import kalix.protocol.action.ActionResponse @@ -95,7 +97,7 @@ private[akka] final class ActionsImpl( services: Map[String, Service], timerClient: TimerClient, sdkExecutionContext: ExecutionContext, - tracerFactory: String => Tracer) + tracerFactory: () => Tracer) extends Actions { import ActionsImpl._ @@ -173,7 +175,7 @@ private[akka] final class ActionsImpl( val fut = try { val messageContext = - createMessageContext(in, service.messageCodec, span.map(_.getSpanContext), service.componentId) + createMessageContext(in, service.messageCodec, span, service.componentId) val decodedPayload = service.messageCodec.decodeMessage( in.payload.getOrElse(throw new IllegalArgumentException("No command payload"))) val effect = service @@ -199,7 +201,7 @@ private[akka] final class ActionsImpl( val fut = try { val messageContext = - createConsumerMessageContext(in, service.messageCodec, span.map(_.getSpanContext), service.componentId) + createConsumerMessageContext(in, service.messageCodec, span, service.componentId) val decodedPayload = service.messageCodec.decodeMessage( in.payload.getOrElse(throw new IllegalArgumentException("No command payload"))) val effect = service @@ -225,21 +227,21 @@ private[akka] final class ActionsImpl( private def createMessageContext( in: ActionCommand, messageCodec: MessageCodec, - spanContext: Option[SpanContext], + span: Option[Span], serviceName: String): CommandContext = { val metadata = MetadataImpl.of(in.metadata.map(_.entries.toVector).getOrElse(Nil)) - val updatedMetadata = spanContext.map(metadata.withTracing).getOrElse(metadata) - new CommandContextImpl(updatedMetadata, messageCodec, system, timerClient, telemetries(serviceName)) + val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata) + new CommandContextImpl(updatedMetadata, messageCodec, system, timerClient, tracerFactory, span) } private def createConsumerMessageContext( in: ActionCommand, messageCodec: MessageCodec, - spanContext: Option[SpanContext], + span: Option[Span], serviceName: String): MessageContext = { val metadata = MetadataImpl.of(in.metadata.map(_.entries.toVector).getOrElse(Nil)) - val updatedMetadata = spanContext.map(metadata.withTracing).getOrElse(metadata) - new MessageContextImpl(updatedMetadata, messageCodec, timerClient, telemetries(serviceName)) + val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata) + new MessageContextImpl(updatedMetadata, messageCodec, timerClient, tracerFactory, span) } override def handleStreamedIn(in: Source[ActionCommand, NotUsed]): Future[ActionResponse] = { @@ -265,7 +267,8 @@ class CommandContextImpl( val messageCodec: MessageCodec, val system: ActorSystem, timerClient: TimerClient, - instrumentation: TraceInstrumentation) + tracerFactory: () => Tracer, + span: Option[Span]) extends AbstractContext with CommandContext { @@ -283,7 +286,5 @@ class CommandContextImpl( } } - override def getTracer: Tracer = - instrumentation.getTracer - + override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumersImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumersImpl.scala index 73fe1f71e..a0bcfaa33 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumersImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/consumer/ConsumersImpl.scala @@ -6,6 +6,7 @@ package akka.javasdk.impl.consumer import akka.annotation.InternalApi import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.consumer.Consumer import akka.javasdk.consumer.MessageContext import akka.javasdk.consumer.MessageEnvelope @@ -15,11 +16,12 @@ import akka.javasdk.impl.JsonMessageCodec import akka.javasdk.impl.MessageCodec import akka.javasdk.impl.MetadataImpl import akka.javasdk.impl.Service +import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.telemetry.Telemetry -import akka.javasdk.impl.telemetry.TraceInstrumentation import akka.javasdk.impl.timer.TimerSchedulerImpl import akka.javasdk.timer.TimerScheduler import akka.runtime.sdk.spi.TimerClient +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer import kalix.protocol.action.Actions import kalix.protocol.component.MetadataEntry @@ -62,7 +64,8 @@ private[impl] final class MessageContextImpl( override val metadata: Metadata, val messageCodec: MessageCodec, timerClient: TimerClient, - instrumentation: TraceInstrumentation) + tracerFactory: () => Tracer, + span: Option[Span]) extends AbstractContext with MessageContext { @@ -86,7 +89,5 @@ private[impl] final class MessageContextImpl( } } - override def getTracer: Tracer = - instrumentation.getTracer - + override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) } diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala index 26e857979..3412e5371 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala @@ -16,6 +16,7 @@ import akka.javasdk.impl.ErrorHandling.BadRequestException import EventSourcedEntityRouter.CommandResult import akka.annotation.InternalApi import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.eventsourcedentity.CommandContext import akka.javasdk.eventsourcedentity.EventContext import akka.javasdk.eventsourcedentity.EventSourcedEntity @@ -32,8 +33,10 @@ import akka.javasdk.impl.effect.ErrorReplyImpl import akka.javasdk.impl.effect.MessageReplyImpl import akka.javasdk.impl.effect.SecondaryEffectImpl import akka.javasdk.impl.telemetry.EventSourcedEntityCategory +import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.telemetry.Telemetry import akka.javasdk.impl.telemetry.TraceInstrumentation +import io.opentelemetry.api.trace.Span import io.opentelemetry.api.trace.Tracer import kalix.protocol.component.Failure import kalix.protocol.event_sourced_entity.EventSourcedStreamIn.Message.{ Command => InCommand } @@ -79,7 +82,7 @@ private[impl] final class EventSourcedEntitiesImpl( _services: Map[String, EventSourcedEntityService[_, _, _]], configuration: Settings, sdkDispatcherName: String, - tracerFactory: String => Tracer) + tracerFactory: () => Tracer) extends EventSourcedEntities { import akka.javasdk.impl.EntityExceptions._ @@ -186,7 +189,7 @@ private[impl] final class EventSourcedEntitiesImpl( ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))) val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) val context = - new CommandContextImpl(thisEntityId, sequence, command.name, command.id, metadata) + new CommandContextImpl(thisEntityId, sequence, command.name, command.id, metadata, span, tracerFactory) val CommandResult( events: Vector[Any], @@ -280,10 +283,14 @@ private[impl] final class EventSourcedEntitiesImpl( override val sequenceNumber: Long, override val commandName: String, override val commandId: Long, - override val metadata: Metadata) + override val metadata: Metadata, + span: Option[Span], + tracerFactory: () => Tracer) extends AbstractContext with CommandContext - with ActivatableContext + with ActivatableContext { + override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory) + } private class EventSourcedEntityContextImpl(override final val entityId: String) extends AbstractContext diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntitiesImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntitiesImpl.scala index dfc2db548..9893093a0 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntitiesImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/keyvalueentity/KeyValueEntitiesImpl.scala @@ -36,10 +36,13 @@ import org.slf4j.MDC import scala.language.existentials import scala.util.control.NonFatal import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.impl.AnySupport import akka.javasdk.impl.effect.MessageReplyImpl import akka.javasdk.impl.keyvalueentity.KeyValueEntityEffectImpl.UpdateState import akka.javasdk.impl.keyvalueentity.KeyValueEntityRouter.CommandResult +import akka.javasdk.impl.telemetry.SpanTracingImpl +import io.opentelemetry.api.trace.Span import kalix.protocol.value_entity.ValueEntityAction.Action.Delete import kalix.protocol.value_entity.ValueEntityAction.Action.Update import kalix.protocol.value_entity.ValueEntityStreamIn.Message.{ Command => InCommand } @@ -71,7 +74,7 @@ private[impl] final class KeyValueEntitiesImpl( val services: Map[String, KeyValueEntityService[_, _]], configuration: Settings, sdkDispatcherName: String, - tracerFactory: String => Tracer) + tracerFactory: () => Tracer) extends ValueEntities { import akka.javasdk.impl.EntityExceptions._ @@ -155,9 +158,9 @@ private[impl] final class KeyValueEntitiesImpl( case InCommand(command) => val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) - + val instrumentation = instrumentations(service.componentId) if (log.isTraceEnabled) log.trace("Metadata entries [{}].", metadata.entries) - val span = instrumentations(service.componentId).buildSpan(service, command) + val span = instrumentation.buildSpan(service, command) span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId)) try { @@ -167,7 +170,7 @@ private[impl] final class KeyValueEntitiesImpl( // FIXME smuggling 0 arity method called from component client through here ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))) val context = - new CommandContextImpl(thisEntityId, command.name, command.id, metadata, system) + new CommandContextImpl(thisEntityId, command.name, command.id, metadata, span, tracerFactory) val (CommandResult(effect: KeyValueEntityEffectImpl[_]), errorCode) = try { @@ -243,10 +246,15 @@ private[impl] final class CommandContextImpl( override val commandName: String, override val commandId: Long, override val metadata: Metadata, - system: ActorSystem) + span: Option[Span], + tracerFactory: () => Tracer) extends AbstractContext with CommandContext - with ActivatableContext + with ActivatableContext { + + override def tracing(): Tracing = + new SpanTracingImpl(span, tracerFactory) +} /** * INTERNAL API diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/SpanTracingImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/SpanTracingImpl.scala new file mode 100644 index 000000000..589964be3 --- /dev/null +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/SpanTracingImpl.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2021-2024 Lightbend Inc. + */ + +package akka.javasdk.impl.telemetry + +import akka.annotation.InternalApi +import akka.javasdk.Tracing +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.Tracer +import io.opentelemetry.context.{ Context => OtelContext } + +import java.util.Optional +import scala.jdk.OptionConverters.RichOption + +/** + * INTERNAL API + */ +@InternalApi +final class SpanTracingImpl(span: Option[Span], tracerFactory: () => Tracer) extends Tracing { + override def startSpan(name: String): Optional[Span] = + span.map { s => + val parent = OtelContext.current().`with`(s) + tracerFactory() + .spanBuilder("ad-hoc span") + .setParent(parent) + .startSpan() + }.toJava + + override def parentSpan(): Optional[Span] = span.toJava +} diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala index 814a21321..a8b8669d8 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/telemetry/Telemetry.scala @@ -113,7 +113,7 @@ private[akka] object TraceInstrumentation { override def keys(carrier: MetadataEntry): lang.Iterable[String] = Collections.singleton(Telemetry.TRACE_PARENT_KEY) } - private val InstrumentationScopeName = "akka-javasdk" + val InstrumentationScopeName: String = "akka-javasdk" } /** @@ -123,7 +123,7 @@ private[akka] object TraceInstrumentation { private[akka] final class TraceInstrumentation( componentName: String, componentCategory: ComponentCategory, - tracerFactory: String => Tracer) { + val tracerFactory: () => Tracer) { import Telemetry._ import TraceInstrumentation._ @@ -135,7 +135,7 @@ private[akka] final class TraceInstrumentation( s"${componentCategory.name}: $simpleComponentName" } - private val tracer = getTracer + private val tracer = tracerFactory() private val enabled = tracer != OpenTelemetry.noop().getTracer(InstrumentationScopeName) /** @@ -168,7 +168,7 @@ private[akka] final class TraceInstrumentation( val spanName = s"$traceNamePrefix.${removeSyntheticName(commandName)}" var spanBuilder = - getTracer + tracer .spanBuilder(spanName) .setParent(parentContext) .setSpanKind(SpanKind.SERVER) @@ -179,8 +179,6 @@ private[akka] final class TraceInstrumentation( } } - def getTracer: Tracer = tracerFactory(InstrumentationScopeName) - private def removeSyntheticName(maybeSyntheticName: String): String = maybeSyntheticName .replace("KalixSyntheticMethodOnES", "") diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewsImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewsImpl.scala index 11a703831..027fe536e 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewsImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/view/ViewsImpl.scala @@ -23,8 +23,6 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } import org.slf4j.LoggerFactory import org.slf4j.MDC -import scala.jdk.OptionConverters._ - /** * INTERNAL API */ @@ -97,7 +95,7 @@ final class ViewsImpl(_services: Map[String, ViewService[_]], sdkDispatcherName: val commandName = receiveEvent.commandName val msg = service.messageCodec.decodeMessage(receiveEvent.payload.get) val metadata = MetadataImpl.of(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil)) - val addedToMDC = metadata.traceContext.traceId().toScala match { + val addedToMDC = metadata.traceId match { case Some(traceId) => MDC.put(Telemetry.TRACE_ID, traceId) true diff --git a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala index 9b55aaebc..ca37d5410 100644 --- a/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala +++ b/akka-javasdk/src/main/scala/akka/javasdk/impl/workflow/WorkflowImpl.scala @@ -4,47 +4,44 @@ package akka.javasdk.impl.workflow -import java.util.Optional -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.jdk.OptionConverters._ -import scala.language.existentials -import scala.util.control.NonFatal import akka.NotUsed -import akka.actor.ActorSystem -import akka.javasdk.impl.ErrorHandling.BadRequestException -import akka.javasdk.impl.WorkflowExceptions.ProtocolException -import akka.javasdk.impl.WorkflowExceptions.WorkflowException -import akka.javasdk.impl.WorkflowExceptions.failureMessageForLog -import WorkflowEffectImpl.DeleteState -import WorkflowEffectImpl.End -import WorkflowEffectImpl.ErrorEffectImpl -import WorkflowEffectImpl.NoPersistence -import WorkflowEffectImpl.NoReply -import WorkflowEffectImpl.NoTransition -import WorkflowEffectImpl.Pause -import WorkflowEffectImpl.Persistence -import WorkflowEffectImpl.Reply -import WorkflowEffectImpl.ReplyValue -import WorkflowEffectImpl.StepTransition -import WorkflowEffectImpl.TransitionalEffectImpl -import WorkflowEffectImpl.UpdateState -import WorkflowRouter.CommandResult +import akka.annotation.InternalApi +import akka.javasdk.Metadata +import akka.javasdk.Tracing import akka.javasdk.impl.AbstractContext import akka.javasdk.impl.ActivatableContext +import akka.javasdk.impl.AnySupport import akka.javasdk.impl.ErrorHandling +import akka.javasdk.impl.ErrorHandling.BadRequestException import akka.javasdk.impl.JsonMessageCodec import akka.javasdk.impl.MessageCodec import akka.javasdk.impl.MetadataImpl import akka.javasdk.impl.Service import akka.javasdk.impl.StrictJsonMessageCodec +import akka.javasdk.impl.WorkflowExceptions.ProtocolException +import akka.javasdk.impl.WorkflowExceptions.WorkflowException +import akka.javasdk.impl.WorkflowExceptions.failureMessageForLog +import akka.javasdk.impl.telemetry.SpanTracingImpl import akka.javasdk.impl.timer.TimerSchedulerImpl +import akka.javasdk.impl.workflow.WorkflowEffectImpl.DeleteState +import akka.javasdk.impl.workflow.WorkflowEffectImpl.End +import akka.javasdk.impl.workflow.WorkflowEffectImpl.ErrorEffectImpl +import akka.javasdk.impl.workflow.WorkflowEffectImpl.NoPersistence +import akka.javasdk.impl.workflow.WorkflowEffectImpl.NoReply +import akka.javasdk.impl.workflow.WorkflowEffectImpl.NoTransition +import akka.javasdk.impl.workflow.WorkflowEffectImpl.Pause +import akka.javasdk.impl.workflow.WorkflowEffectImpl.Persistence +import akka.javasdk.impl.workflow.WorkflowEffectImpl.Reply +import akka.javasdk.impl.workflow.WorkflowEffectImpl.ReplyValue +import akka.javasdk.impl.workflow.WorkflowEffectImpl.StepTransition +import akka.javasdk.impl.workflow.WorkflowEffectImpl.TransitionalEffectImpl +import akka.javasdk.impl.workflow.WorkflowEffectImpl.UpdateState +import akka.javasdk.impl.workflow.WorkflowRouter.CommandResult import akka.javasdk.workflow.CommandContext import akka.javasdk.workflow.Workflow -import akka.runtime.sdk.spi.TimerClient -import Workflow.WorkflowDef -import akka.annotation.InternalApi +import akka.javasdk.workflow.Workflow.WorkflowDef import akka.javasdk.workflow.WorkflowContext +import akka.runtime.sdk.spi.TimerClient import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Source import com.google.protobuf.ByteString @@ -52,6 +49,8 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.duration import com.google.protobuf.duration.Duration import io.grpc.Status +import io.opentelemetry.api.trace.Span +import io.opentelemetry.api.trace.Tracer import kalix.protocol.component import kalix.protocol.component.{ Reply => ProtoReply } import kalix.protocol.workflow_entity.RecoverStrategy @@ -62,9 +61,9 @@ import kalix.protocol.workflow_entity.WorkflowEffect import kalix.protocol.workflow_entity.WorkflowEntities import kalix.protocol.workflow_entity.WorkflowEntityInit import kalix.protocol.workflow_entity.WorkflowStreamIn +import kalix.protocol.workflow_entity.WorkflowStreamIn.Message import kalix.protocol.workflow_entity.WorkflowStreamIn.Message.Empty import kalix.protocol.workflow_entity.WorkflowStreamIn.Message.Init -import kalix.protocol.workflow_entity.WorkflowStreamIn.Message import kalix.protocol.workflow_entity.WorkflowStreamIn.Message.Step import kalix.protocol.workflow_entity.WorkflowStreamIn.Message.Transition import kalix.protocol.workflow_entity.WorkflowStreamIn.Message.{ Command => InCommand } @@ -75,10 +74,14 @@ import kalix.protocol.workflow_entity.{ NoTransition => ProtoNoTransition } import kalix.protocol.workflow_entity.{ Pause => ProtoPause } import kalix.protocol.workflow_entity.{ StepTransition => ProtoStepTransition } import org.slf4j.LoggerFactory -import akka.javasdk.Metadata -import akka.javasdk.impl.AnySupport +import java.util.Optional +import scala.concurrent.ExecutionContext +import scala.concurrent.Future import scala.jdk.CollectionConverters._ +import scala.jdk.OptionConverters._ +import scala.language.existentials +import scala.util.control.NonFatal /** * INTERNAL API @@ -102,11 +105,11 @@ final class WorkflowService[S, W <: Workflow[S]]( */ @InternalApi final class WorkflowImpl( - system: ActorSystem, val services: Map[String, WorkflowService[_, _]], timerClient: TimerClient, sdkExcutionContext: ExecutionContext, - sdkDispatcherName: String) + sdkDispatcherName: String, + tracerFactory: () => Tracer) extends kalix.protocol.workflow_entity.WorkflowEntities { private implicit val ec: ExecutionContext = sdkExcutionContext @@ -287,7 +290,15 @@ final class WorkflowImpl( case InCommand(command) => val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil)) - val context = new CommandContextImpl(workflowId, command.name, command.id, metadata, system) + val context = + new CommandContextImpl( + workflowId, + command.name, + command.id, + metadata, + // FIXME we'd need to start a parent span for the command here to have one to base custom user spans of off? + None, + tracerFactory) val timerScheduler = new TimerSchedulerImpl(service.strictMessageCodec, timerClient, context.componentCallMetadata) @@ -314,7 +325,14 @@ final class WorkflowImpl( case Step(executeStep) => val context = - new CommandContextImpl(workflowId, executeStep.stepName, executeStep.commandId, Metadata.EMPTY, system) + new CommandContextImpl( + workflowId, + executeStep.stepName, + executeStep.commandId, + Metadata.EMPTY, + // FIXME we'd need to start a parent span for the step here to have one to base custom user spans of off? + None, + tracerFactory) val timerScheduler = new TimerSchedulerImpl(service.strictMessageCodec, timerClient, context.componentCallMetadata) val stepResponse = @@ -391,10 +409,15 @@ private[akka] final class CommandContextImpl( override val commandName: String, override val commandId: Long, override val metadata: Metadata, - system: ActorSystem) + span: Option[Span], + tracerFactory: () => Tracer) extends AbstractContext with CommandContext - with ActivatableContext + with ActivatableContext { + + override def tracing(): Tracing = + new SpanTracingImpl(span, tracerFactory) +} /** * INTERNAL API diff --git a/akka-javasdk/src/test/java/akka/javasdk/timedaction/TestTracing.java b/akka-javasdk/src/test/java/akka/javasdk/timedaction/TestTracing.java index e0ffa2e6a..0c5f6c70e 100644 --- a/akka-javasdk/src/test/java/akka/javasdk/timedaction/TestTracing.java +++ b/akka-javasdk/src/test/java/akka/javasdk/timedaction/TestTracing.java @@ -8,6 +8,9 @@ import akka.javasdk.consumer.Consumer; import akka.javasdk.eventsourcedentity.TestESEvent; import akka.javasdk.eventsourcedentity.TestEventSourcedEntity; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import akka.javasdk.annotations.ComponentId; @@ -20,7 +23,20 @@ public class TestTracing extends Consumer { public Effect consume(TestESEvent.Event2 event) { logger.info("registering a logging event"); - return effects().produce( - messageContext().metadata().traceContext().traceParent().orElse("not-found")); + + // test expects a w3c encoded trace parent so here are some hoops to get that + // FIXME if this turns out to be a common need we could provide the w3c encoded traceparent from Tracing + // but for now leaving otel hoops to users is fine enough + String[] w3cEncodedTraceParent = {"not-enabled"}; + messageContext().tracing().parentSpan().ifPresent(span -> { + var contextWithSpan = Context.current().with(span); + W3CTraceContextPropagator.getInstance().inject(contextWithSpan, null, + (carrier, key, value) -> { + if (key.equals("traceparent")) { + w3cEncodedTraceParent[0] = value; + } + }); + }); + return effects().produce(w3cEncodedTraceParent[0]); } } diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumersImplSpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumersImplSpec.scala index 820aaffbf..1ef1f09cd 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumersImplSpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/ConsumersImplSpec.scala @@ -56,7 +56,9 @@ class ConsumersImplSpec private val classicSystem = system.toClassic - def create(service: ConsumerService[_], tracerFactory: String => Tracer = OpenTelemetry.noop.getTracer _): Actions = { + def create( + service: ConsumerService[_], + tracerFactory: () => Tracer = () => OpenTelemetry.noop.getTracer("test")): Actions = { new ActionsImpl( classicSystem, Map(service.descriptor.getFullName -> service), @@ -117,7 +119,7 @@ class ConsumersImplSpec .builder() .build() - val service = create(consumerProvider, openTelemetryInstance.getTracer) + val service = create(consumerProvider, () => openTelemetryInstance.getTracer("test")) val serviceName = consumerProvider.descriptor.getFullName val cmd1 = ScalaPbAny.fromJavaProto(JsonSupport.encodeJson(new TestESEvent.Event2(123))) diff --git a/akka-javasdk/src/test/scala/akka/javasdk/impl/action/TimedActionHandlerSpec.scala b/akka-javasdk/src/test/scala/akka/javasdk/impl/action/TimedActionHandlerSpec.scala index 0e6b3bb67..49e8d124b 100644 --- a/akka-javasdk/src/test/scala/akka/javasdk/impl/action/TimedActionHandlerSpec.scala +++ b/akka-javasdk/src/test/scala/akka/javasdk/impl/action/TimedActionHandlerSpec.scala @@ -68,7 +68,7 @@ class TimedActionHandlerSpec override def removeTimer(name: String): Future[Done] = ??? }, classicSystem.dispatcher, - OpenTelemetry.noop().getTracer _) + () => OpenTelemetry.noop().getTracer("test")) } "The action service" should { diff --git a/docs/src/modules/java/pages/setup-and-dependency-injection.adoc b/docs/src/modules/java/pages/setup-and-dependency-injection.adoc index e6f396a21..bb1380728 100644 --- a/docs/src/modules/java/pages/setup-and-dependency-injection.adoc +++ b/docs/src/modules/java/pages/setup-and-dependency-injection.adoc @@ -57,7 +57,7 @@ Furthermore, the following component specific types can also be injected: | Injectable classes | Endpoint -|`io.opentelemetry.api.trace.Span` for creating custom traces +|`akka.javasdk.http.RequestContext` with access to request related things. |Workflow |`akka.javasdk.workflow.WorkflowContext` for access to the workflow id |Event Sourced Entity diff --git a/samples/tracing/README.md b/samples/tracing/README.md index 8184a8b29..16e06a173 100644 --- a/samples/tracing/README.md +++ b/samples/tracing/README.md @@ -20,35 +20,38 @@ mvn compile ## Running Locally -When running an Akka service locally. -To start your service locally, run: +First start a local Jaeger in docker using the prepared docker compose file from the sample project directory: ```shell -TRACING_ENABLED=true COLLECTOR_ENDPOINT="http://localhost:4317" mvn compile exec:java +docker compose up ``` -This command will start your Akka service, with tracing enabled and exporting the generated -traces to the Jaeger container referred below. - -To start Jaeger locally, run: +Then start your service locally, with tracing enabled and reporting to the local Jaeger instance: ```shell -docker compose up +TRACING_ENABLED=true COLLECTOR_ENDPOINT="http://localhost:4317" mvn compile exec:java ``` ## Exercising the service With your Akka service running, any defined endpoints should be available at `http://localhost:9000`. -- Add a new user +Report a custom span around an async task inside an endpoint: + +```shell +curl -i -XPOST localhost:9000/tracing/custom/5 +``` + +Schedule a timed action which reports a custom span when executing an async call to an external service: ```shell - curl -i -XPOST -H "Content-Type: application/json" localhost:9000/tracing -d '{"id":"2454cb46-1b16-408a-b7f8-bd2d5c376969"}' +curl -i -XPOST -H "Content-Type: application/json" localhost:9000/tracing -d '{"id":"2454cb46-1b16-408a-b7f8-bd2d5c376969"}' ``` +Now you can see the trace in Jaeger UI at http://localhost:16686 -- Now you can see the trace in Jaeger UI at http://localhost:16686 - - select "runtime" and "Find all traces" to explore the trace +Select "runtime" and "Find all traces" to explore the traces, you should see "POST /tracing/custom/{id}" and "POST /tracing/" +for the respective two calls above. ## Deploying diff --git a/samples/tracing/docker-compose.yml b/samples/tracing/docker-compose.yml index 979333f06..4e7f4459b 100644 --- a/samples/tracing/docker-compose.yml +++ b/samples/tracing/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3" services: jaeger: image: jaegertracing/all-in-one:1.54 diff --git a/samples/tracing/src/main/java/com/example/tracing/api/TracingEndpoint.java b/samples/tracing/src/main/java/com/example/tracing/api/TracingEndpoint.java index 4eee3a99c..6d50a5f9d 100644 --- a/samples/tracing/src/main/java/com/example/tracing/api/TracingEndpoint.java +++ b/samples/tracing/src/main/java/com/example/tracing/api/TracingEndpoint.java @@ -1,26 +1,38 @@ package com.example.tracing.api; import akka.Done; +import akka.http.javadsl.model.HttpResponse; +import akka.javasdk.annotations.Acl; import akka.javasdk.annotations.http.HttpEndpoint; import akka.javasdk.annotations.http.Post; import akka.javasdk.client.ComponentClient; -import com.example.tracing.application.TracingAction; +import akka.javasdk.http.HttpResponses; +import akka.javasdk.http.RequestContext; import akka.javasdk.timer.TimerScheduler; +import com.example.tracing.application.TracingAction; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import java.time.Duration; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +// Opened up for access from the public internet to make the sample service easy to try out. +// For actual services meant for production this must be carefully considered, and often set more limited +@Acl(allow = @Acl.Matcher(principal = Acl.Principal.INTERNET)) @HttpEndpoint("/tracing") public class TracingEndpoint { private final ComponentClient componentClient; private final TimerScheduler timerScheduler; + private final RequestContext requestContext; - - public TracingEndpoint(ComponentClient componentClient, TimerScheduler timerScheduler) { + public TracingEndpoint(ComponentClient componentClient, TimerScheduler timerScheduler, RequestContext requestContext) { this.componentClient = componentClient; this.timerScheduler = timerScheduler; + this.requestContext = requestContext; } private record PostId(String id){} @@ -35,4 +47,34 @@ public CompletionStage postDelayed(PostId id) { ); } + @Post("/custom/{id}") + public CompletionStage customSpan(String id) { + Optional maybeSpan = requestContext.tracing().startSpan("ad-hoc endpoint span"); + + maybeSpan.ifPresent(span -> span.setAttribute("id", id)); + + // do some stuff + // potentially async, might throw + var result = CompletableFuture.supplyAsync(() -> { + maybeSpan.ifPresent(span -> span.addEvent("Spawned async task")); + return Integer.valueOf(id); + }); + + return result.handle((ok, error) -> { + if (error != null) { + maybeSpan.ifPresent(span -> { + span.setStatus(StatusCode.ERROR, error.getMessage()); + span.end(); + }); + return HttpResponses.internalServerError("Boom"); + } else { + maybeSpan.ifPresent(span -> { + span.setStatus(StatusCode.OK); + span.end(); + }); + return HttpResponses.ok("Ok!"); + } + }); + } + } diff --git a/samples/tracing/src/main/java/com/example/tracing/application/TracingAction.java b/samples/tracing/src/main/java/com/example/tracing/application/TracingAction.java index af3a1160d..3793c10f7 100644 --- a/samples/tracing/src/main/java/com/example/tracing/application/TracingAction.java +++ b/samples/tracing/src/main/java/com/example/tracing/application/TracingAction.java @@ -1,37 +1,43 @@ package com.example.tracing.application; - import akka.javasdk.annotations.ComponentId; import akka.javasdk.timedaction.TimedAction; -import com.example.tracing.domain.Typicode; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.http.HttpResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @ComponentId("tracing-action") public class TracingAction extends TimedAction { - private final static Logger logger = LoggerFactory.getLogger(TracingAction.class); - - private final Typicode typicode = new Typicode(); - - public Effect callAnotherService(String postID){ - logger.info("Calling to [{}].", Typicode.url + "/" + postID); - var newSpan = commandContext().getTracer() - .spanBuilder("ad-hoc span calling to: " + Typicode.url) - .setParent(commandContext().metadata().traceContext().asOpenTelemetryContext()) - .startSpan() - .setAttribute("post", postID); - - return effects().asyncEffect( - typicode.callAsyncService(postID) - .whenComplete((response, ex) -> { - if (ex != null) { - newSpan.setStatus(StatusCode.ERROR, ex.getMessage()).end(); - } else { - newSpan.setAttribute("result", response.body().title()).end(); - } - }) - .thenApply(__ -> effects().done() )); - } + private final static Logger logger = LoggerFactory.getLogger(TracingAction.class); + + private final Typicode typicode = new Typicode(); + + public Effect callAnotherService(String postID) { + logger.info("Calling to [{}].", Typicode.url + "/" + postID); + Optional maybeSpan = commandContext().tracing().startSpan("ad-hoc span calling to: " + Typicode.url); + + maybeSpan.ifPresent(span -> span.setAttribute("post", postID)); + + CompletionStage> asyncResult = typicode.callAsyncService(postID, maybeSpan); + + maybeSpan.ifPresent(span -> + asyncResult.whenComplete((response, ex) -> { + + if (ex != null) { + span.setStatus(StatusCode.ERROR, ex.getMessage()).end(); + } else { + span.setAttribute("response-status", response.statusCode()).end(); + } + }) + ); + + + return effects().asyncEffect(asyncResult.thenApply(__ -> effects().done())); + } } \ No newline at end of file diff --git a/samples/tracing/src/main/java/com/example/tracing/domain/Typicode.java b/samples/tracing/src/main/java/com/example/tracing/application/Typicode.java similarity index 52% rename from samples/tracing/src/main/java/com/example/tracing/domain/Typicode.java rename to samples/tracing/src/main/java/com/example/tracing/application/Typicode.java index 81fc101ea..b23622a87 100644 --- a/samples/tracing/src/main/java/com/example/tracing/domain/Typicode.java +++ b/samples/tracing/src/main/java/com/example/tracing/application/Typicode.java @@ -1,33 +1,59 @@ -package com.example.tracing.domain; +package com.example.tracing.application; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.Optional; import java.util.concurrent.CompletionStage; public class Typicode { + + private static final Logger log = LoggerFactory.getLogger(Typicode.class); + public static final String url = "https://jsonplaceholder.typicode.com/posts"; + // using a third party HTTP client here rather than the built in `akka.javasdk.http.HttpClient` - // in order to showcase external/manual tracing + // in order to showcase external/manual tracing and propagating context manually private final HttpClient httpClient = HttpClient.newHttpClient(); + private final TextMapSetter setter = + (carrier, key, value) -> carrier.setHeader(key, value); + public record TypicodePost(String userId, String id, String title, String body) {} - public CompletionStage> callAsyncService(String postID) { - HttpRequest httpRequest = HttpRequest.newBuilder() - .uri(URI.create(url + "/" + postID)) - .build(); + public CompletionStage> callAsyncService(String postID, Optional parentSpan) { + var requestBuilder = HttpRequest.newBuilder() + .uri(URI.create(url + "/" + postID)); + + parentSpan.ifPresent(span -> { + // propagate trace parent to third party service + var contextWithSpan = Context.current().with(span); + W3CTraceContextPropagator.getInstance().inject(contextWithSpan, requestBuilder, setter); + }); + + HttpRequest httpRequest = requestBuilder.build(); + + parentSpan.ifPresent(__ -> { + log.info("Request headers propagating open telemetry trace parent: {}", httpRequest.headers().toString()); + }); + //Async call to external service return httpClient.sendAsync(httpRequest, new JsonResponseHandler<>(TypicodePost.class)); } - public static class JsonResponseHandler implements HttpResponse.BodyHandler { + private static class JsonResponseHandler implements HttpResponse.BodyHandler { private final Class responseType; public JsonResponseHandler(Class responseType){