diff --git a/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MultiSpanProcessorBenchmark.scala b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MultiSpanProcessorBenchmark.scala new file mode 100644 index 000000000..bf1457c67 --- /dev/null +++ b/benchmarks/src/main/scala/org/typelevel/otel4s/benchmarks/MultiSpanProcessorBenchmark.scala @@ -0,0 +1,142 @@ +/* + * Copyright 2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.typelevel.otel4s.benchmarks + +import cats.effect.IO +import cats.syntax.foldable._ +import io.opentelemetry.context.Context +import io.opentelemetry.sdk.trace.ReadWriteSpan +import org.openjdk.jmh.annotations._ + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +// benchmarks/Jmh/run org.typelevel.otel4s.benchmarks.MultiSpanProcessorBenchmark -prof gc +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +class MultiSpanProcessorBenchmark { + + import MultiSpanProcessorBenchmark._ + + @Param(Array("oteljava", "sdk")) + var backend: String = _ + + @Param(Array("1", "5", "10")) + var processorCount: Int = _ + + @Param(Array("1")) + var spanCount: Int = _ + + private var processor: Processor = _ + + @Benchmark + def onEnd(): Unit = + processor.onEnd() + + @Setup(Level.Trial) + def setup(): Unit = + backend match { + case "oteljava" => + processor = Processor.otelJava(processorCount) + + case "sdk" => + processor = Processor.sdk(processorCount) + + case other => + sys.error(s"unknown backend [$other]") + } + +} + +object MultiSpanProcessorBenchmark { + + trait Processor { + def onEnd(): Unit + } + + object Processor { + + def otelJava(processorCount: Int): Processor = { + import io.opentelemetry.api.trace.Span + import io.opentelemetry.sdk.trace.{ReadableSpan, SdkTracerProvider} + import io.opentelemetry.sdk.trace.SpanProcessor + import scala.jdk.CollectionConverters._ + + val tracer = SdkTracerProvider.builder().build().get("benchmarkTracer") + + val span: Span = + tracer.spanBuilder("span").startSpan() + + val processor = new SpanProcessor { + def onStart(parentContext: Context, span: ReadWriteSpan): Unit = () + def isStartRequired: Boolean = true + def onEnd(span: ReadableSpan): Unit = () + def isEndRequired: Boolean = true + } + + val proc = SpanProcessor.composite(List.fill(processorCount)(processor).asJava) + + new Processor { + def onEnd(): Unit = + proc.onEnd(span.asInstanceOf[ReadableSpan]) + } + } + + def sdk(processorCount: Int): Processor = { + import cats.effect.unsafe.implicits.global + import org.typelevel.otel4s.trace.{SpanContext, SpanKind} + import org.typelevel.otel4s.sdk.TelemetryResource + import org.typelevel.otel4s.sdk.common.InstrumentationScope + import org.typelevel.otel4s.sdk.trace.data.{LimitedData, SpanData, StatusData} + import org.typelevel.otel4s.sdk.trace.processor.SpanProcessor + + val processor: SpanProcessor[IO] = new SpanProcessor[IO] { + def name: String = "Noop" + val onStart: SpanProcessor.OnStart[IO] = (_, _) => IO.unit + val onEnd: SpanProcessor.OnEnd[IO] = _ => IO.unit + def forceFlush: IO[Unit] = IO.unit + } + + val spanData = SpanData( + name = "span", + spanContext = SpanContext.invalid, + parentSpanContext = None, + kind = SpanKind.Internal, + startTimestamp = Duration.Zero, + endTimestamp = None, + status = StatusData.Ok, + attributes = LimitedData.attributes(Int.MaxValue, 1024), + events = LimitedData.events(Int.MaxValue), + links = LimitedData.links(Int.MaxValue), + instrumentationScope = InstrumentationScope.empty, + resource = TelemetryResource.empty + ) + + val proc = List.fill(processorCount)(processor).combineAll + + new Processor { + def onEnd(): Unit = proc.onEnd(spanData).unsafeRunSync() + } + } + + } + +} diff --git a/build.sbt b/build.sbt index ae7aa96a5..4354b26b0 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ import com.typesafe.tools.mima.core._ -ThisBuild / tlBaseVersion := "0.11" +ThisBuild / tlBaseVersion := "0.12" ThisBuild / organization := "org.typelevel" ThisBuild / organizationName := "Typelevel" diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilder.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilder.scala index 2fac9d64c..64ff32bd6 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilder.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBuilder.scala @@ -90,9 +90,7 @@ private final case class SdkSpanBuilder[F[_]: Temporal: Console] private ( def release(backend: Span.Backend[F], ec: Resource.ExitCase): F[Unit] = for { - _ <- state.finalizationStrategy - .lift(ec) - .foldMapM(SpanFinalizer.run(backend, _)) + _ <- state.finalizationStrategy.lift(ec).foldMapM(SpanFinalizer.run(backend, _)) _ <- backend.end } yield () diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkTracerProvider.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkTracerProvider.scala index b46dd06cf..b7ddc55ab 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkTracerProvider.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/SdkTracerProvider.scala @@ -158,9 +158,7 @@ object SdkTracerProvider { /** Creates a new [[Builder]] with default configuration. */ - def builder[ - F[_]: Temporal: Parallel: Random: LocalContext: Console - ]: Builder[F] = + def builder[F[_]: Temporal: Parallel: Random: LocalContext: Console]: Builder[F] = BuilderImpl[F]( idGenerator = IdGenerator.random, resource = TelemetryResource.default, @@ -170,9 +168,7 @@ object SdkTracerProvider { spanProcessors = Nil ) - private final case class BuilderImpl[ - F[_]: Temporal: Parallel: LocalContext: Console - ]( + private final case class BuilderImpl[F[_]: Temporal: Parallel: LocalContext: Console]( idGenerator: IdGenerator[F], resource: TelemetryResource, spanLimits: SpanLimits, diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala index 7f3ece7a5..9a20012bc 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/BatchSpanProcessor.scala @@ -30,7 +30,6 @@ import cats.effect.syntax.temporal._ import cats.syntax.all._ import org.typelevel.otel4s.sdk.trace.data.SpanData import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter -import org.typelevel.otel4s.trace.SpanContext import scala.concurrent.duration._ @@ -67,13 +66,10 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( s"maxQueueSize=${config.maxQueueSize}, " + s"maxExportBatchSize=${config.maxExportBatchSize}}" - val isStartRequired: Boolean = false - val isEndRequired: Boolean = true + val onStart: SpanProcessor.OnStart[F] = + SpanProcessor.OnStart.noop - def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = - unit - - def onEnd(span: SpanData): F[Unit] = + val onEnd: SpanProcessor.OnEnd[F] = { (span: SpanData) => if (span.spanContext.isSampled) { // if 'spansNeeded' is defined, it means the worker is waiting for a certain number of spans // and it waits for the 'signal'-latch to be released @@ -92,6 +88,7 @@ private final class BatchSpanProcessor[F[_]: Temporal: Console] private ( } else { unit } + } def forceFlush: F[Unit] = exportAll diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala index 65ca5c653..475078aff 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SimpleSpanProcessor.scala @@ -22,7 +22,6 @@ import cats.effect.std.Console import cats.syntax.applicativeError._ import org.typelevel.otel4s.sdk.trace.data.SpanData import org.typelevel.otel4s.sdk.trace.exporter.SpanExporter -import org.typelevel.otel4s.trace.SpanContext /** An implementation of the [[SpanProcessor]] that passes ended [[data.SpanData SpanData]] directly to the configured * exporter. @@ -44,13 +43,10 @@ private final class SimpleSpanProcessor[F[_]: MonadThrow: Console] private ( val name: String = s"SimpleSpanProcessor{exporter=${exporter.name}, exportOnlySampled=$exportOnlySampled}" - val isStartRequired: Boolean = false - val isEndRequired: Boolean = true + val onStart: SpanProcessor.OnStart[F] = + SpanProcessor.OnStart.noop - def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = - MonadThrow[F].unit - - def onEnd(span: SpanData): F[Unit] = { + val onEnd: SpanProcessor.OnEnd[F] = { (span: SpanData) => val canExport = !exportOnlySampled || span.spanContext.isSampled if (canExport) doExport(span) else MonadThrow[F].unit } diff --git a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessor.scala b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessor.scala index c7191ce71..3af12e993 100644 --- a/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessor.scala +++ b/sdk/trace/src/main/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessor.scala @@ -51,36 +51,19 @@ trait SpanProcessor[F[_]] { /** Called when a span is started, if the `span.isRecording` returns true. * - * This method is called synchronously on the execution thread, should not throw or block the execution thread. + * The handler is called synchronously on the execution thread, should not throw or block the execution thread. * - * @param parentContext - * the optional parent [[trace.SpanContext SpanContext]] - * - * @param span - * the started span - */ - def onStart(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] - - /** Whether the [[SpanProcessor]] requires start events. - * - * If true, the [[onStart]] will be called upon the start of a span. + * Use [[SpanProcessor.OnStart.noop]] to define a noop operation if start events aren't required by the processor. */ - def isStartRequired: Boolean + def onStart: SpanProcessor.OnStart[F] /** Called when a span is ended, if the `span.isRecording` returns true. * - * This method is called synchronously on the execution thread, should not throw or block the execution thread. + * The handler is called synchronously on the execution thread, should not throw or block the execution thread. * - * @param span - * the ended span + * Use [[SpanProcessor.OnStart.noop]] to define a noop operation if end events aren't required by the processor. */ - def onEnd(span: SpanData): F[Unit] - - /** Whether the [[SpanProcessor]] requires end events. - * - * If true, the [[onEnd]] will be called upon the end of a span. - */ - def isEndRequired: Boolean + def onEnd: SpanProcessor.OnEnd[F] /** Processes all pending spans (if any). */ @@ -92,11 +75,68 @@ trait SpanProcessor[F[_]] { object SpanProcessor { + /** Evaluated when a span is started. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#onstart]] + */ + trait OnStart[F[_]] { + + /** Called when a span is started, if the `span.isRecording` returns true. + * + * This method is called synchronously on the execution thread, should not throw or block the execution thread. + * + * @param parentContext + * the optional parent [[trace.SpanContext SpanContext]] + * + * @param span + * the started span + */ + def apply(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] + } + + object OnStart { + + def noop[F[_]: Applicative]: OnStart[F] = + new Noop + + private[SpanProcessor] final class Noop[F[_]: Applicative] extends OnStart[F] { + private val unit: F[Unit] = Applicative[F].unit + def apply(parentContext: Option[SpanContext], span: SpanRef[F]): F[Unit] = unit + } + } + + /** Evaluated when a span is ended. + * + * @see + * [[https://opentelemetry.io/docs/specs/otel/trace/sdk/#onendspan]] + */ + trait OnEnd[F[_]] { + + /** Called when a span is ended, if the `span.isRecording` returns true. + * + * This method is called synchronously on the execution thread, should not throw or block the execution thread. + * + * @param span + * the ended span + */ + def apply(span: SpanData): F[Unit] + } + + object OnEnd { + + def noop[F[_]: Applicative]: OnEnd[F] = + new Noop + + private[SpanProcessor] final class Noop[F[_]: Applicative] extends OnEnd[F] { + private val unit: F[Unit] = Applicative[F].unit + def apply(span: SpanData): F[Unit] = unit + } + } + /** Creates a [[SpanProcessor]] which delegates all processing to the processors in order. */ - def of[F[_]: MonadThrow: Parallel]( - processors: SpanProcessor[F]* - ): SpanProcessor[F] = + def of[F[_]: MonadThrow: Parallel](processors: SpanProcessor[F]*): SpanProcessor[F] = if (processors.sizeIs == 1) processors.head else processors.combineAll @@ -161,55 +201,64 @@ object SpanProcessor { private final class Noop[F[_]: Applicative] extends SpanProcessor[F] { val name: String = "SpanProcessor.Noop" - - def isStartRequired: Boolean = false - def isEndRequired: Boolean = false - - def onStart(parentCtx: Option[SpanContext], span: SpanRef[F]): F[Unit] = - Applicative[F].unit - - def onEnd(span: SpanData): F[Unit] = - Applicative[F].unit - - def forceFlush: F[Unit] = - Applicative[F].unit + val onStart: OnStart[F] = OnStart.noop + val onEnd: OnEnd[F] = OnEnd.noop + val forceFlush: F[Unit] = Applicative[F].unit } private final case class Multi[F[_]: MonadThrow: Parallel]( processors: NonEmptyList[SpanProcessor[F]] ) extends SpanProcessor[F] { - private val startOnly: List[SpanProcessor[F]] = - processors.filter(_.isStartRequired) - - private val endOnly: List[SpanProcessor[F]] = - processors.filter(_.isEndRequired) - val name: String = s"SpanProcessor.Multi(${processors.map(_.name).mkString_(", ")})" - def isStartRequired: Boolean = startOnly.nonEmpty - def isEndRequired: Boolean = endOnly.nonEmpty + /** We use 'traverse' instead of 'parTraverse' due to: + * + * If multiple SpanProcessors are registered, their OnStart callbacks are invoked in the order they have been + * registered. + * + * Source: https://opentelemetry.io/docs/specs/otel/trace/sdk/#onstart + */ + val onStart: OnStart[F] = { + val start = processors.filter { p => + p.onStart match { + case _: OnStart.Noop[_] => false + case _ => true + } + } + + if (start.nonEmpty) { (parentContext, span) => + start + .traverse(p => p.onStart(parentContext, span).attempt.tupleLeft(p.name)) + .flatMap(attempts => handleAttempts(attempts)) + } else { + OnStart.noop + } + } - def onStart(parentCtx: Option[SpanContext], span: SpanRef[F]): F[Unit] = - startOnly - .parTraverse { p => - p.onStart(parentCtx, span).attempt.tupleLeft(p.name) + val onEnd: OnEnd[F] = { + val end = processors.filter { p => + p.onEnd match { + case _: OnEnd.Noop[_] => false + case _ => true } - .flatMap(attempts => handleAttempts(attempts)) + } - def onEnd(span: SpanData): F[Unit] = - endOnly - .parTraverse(p => p.onEnd(span).attempt.tupleLeft(p.name)) - .flatMap(attempts => handleAttempts(attempts)) + if (end.nonEmpty) { span => + end + .parTraverse(p => p.onEnd(span).attempt.tupleLeft(p.name)) + .flatMap(attempts => handleAttempts(attempts)) + } else { + OnEnd.noop + } + } def forceFlush: F[Unit] = processors .parTraverse(p => p.forceFlush.attempt.tupleLeft(p.name)) .flatMap(attempts => handleAttempts(attempts.toList)) - private def handleAttempts( - results: List[(String, Either[Throwable, Unit])] - ): F[Unit] = { + private def handleAttempts(results: List[(String, Either[Throwable, Unit])]): F[Unit] = { val failures = results.collect { case (processor, Left(failure)) => ProcessorFailure(processor, failure) } diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBackendSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBackendSuite.scala index c4f1af78d..82cd8f390 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBackendSuite.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/SdkSpanBackendSuite.scala @@ -484,23 +484,15 @@ class SdkSpanBackendSuite extends CatsEffectSuite with ScalaCheckEffectSuite { } } - private def startEndRecorder( - start: Queue[IO, SpanData], - end: Queue[IO, SpanData] - ): SpanProcessor[IO] = + private def startEndRecorder(start: Queue[IO, SpanData], end: Queue[IO, SpanData]): SpanProcessor[IO] = new SpanProcessor[IO] { val name: String = "InMemorySpanProcessor" - val isStartRequired: Boolean = true - val isEndRequired: Boolean = true - def onStart( - parentContext: Option[SpanContext], - span: SpanRef[IO] - ): IO[Unit] = - span.toSpanData.flatMap(d => start.offer(d)) + val onStart: SpanProcessor.OnStart[IO] = + (_, span) => span.toSpanData.flatMap(d => start.offer(d)) - def onEnd(span: SpanData): IO[Unit] = - end.offer(span) + val onEnd: SpanProcessor.OnEnd[IO] = + span => end.offer(span) def forceFlush: IO[Unit] = IO.unit diff --git a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessorSuite.scala b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessorSuite.scala index 62cad2c01..c66df8bbb 100644 --- a/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessorSuite.scala +++ b/sdk/trace/src/test/scala/org/typelevel/otel4s/sdk/trace/processor/SpanProcessorSuite.scala @@ -20,8 +20,6 @@ import cats.data.NonEmptyList import cats.effect.IO import munit.FunSuite import org.typelevel.otel4s.sdk.trace.SpanRef -import org.typelevel.otel4s.sdk.trace.data.SpanData -import org.typelevel.otel4s.trace.SpanContext class SpanProcessorSuite extends FunSuite { @@ -136,26 +134,10 @@ class SpanProcessorSuite extends FunSuite { flush: IO[Unit] = IO.unit, ): SpanProcessor[IO] = new SpanProcessor[IO] { - def name: String = - processorName - - def onStart( - parentContext: Option[SpanContext], - span: SpanRef[IO] - ): IO[Unit] = - start - - def isStartRequired: Boolean = - false - - def onEnd(span: SpanData): IO[Unit] = - end - - def isEndRequired: Boolean = - false - - def forceFlush: IO[Unit] = - flush + def name: String = processorName + def onStart: SpanProcessor.OnStart[IO] = (_, _) => start + def onEnd: SpanProcessor.OnEnd[IO] = _ => end + def forceFlush: IO[Unit] = flush } }