diff --git a/composer.json b/composer.json index 7ac30bc1..b8936b9e 100644 --- a/composer.json +++ b/composer.json @@ -11,7 +11,8 @@ }, "require-dev": { "composer/xdebug-handler": "^2.0", - "open-telemetry/dev-tools": "dev-main" + "open-telemetry/dev-tools": "dev-main", + "phpunit/phpunit": "^11.4" }, "autoload": { "psr-4": { diff --git a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php index 9b0b8e60..ccdae7b4 100644 --- a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php +++ b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php @@ -36,10 +36,9 @@ public static function register(): void { $instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger'); - /** - * MessageBusInterface dispatches messages to the handlers. - */ - hook( + + // Instrument MessageBusInterface (message dispatching) + hook( MessageBusInterface::class, 'dispatch', pre: static function ( @@ -54,19 +53,17 @@ public static function register(): void $message = $params[0]; $messageClass = \get_class($message); - /** @psalm-suppress ArgumentTypeCoercion */ + // Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('DISPATCH %s', $messageClass)) - ->setSpanKind(SpanKind::KIND_PRODUCER) + ->spanBuilder(\sprintf('publish %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) - ->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) - ; + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); $parent = Context::getCurrent(); $span = $builder @@ -103,9 +100,7 @@ public static function register(): void } ); - /** - * SenderInterface sends messages to a transport. - */ + // Instrument SenderInterface (sending messages to transport) hook( SenderInterface::class, 'send', @@ -121,28 +116,88 @@ public static function register(): void $envelope = $params[0]; $messageClass = \get_class($envelope->getMessage()); - /** @psalm-suppress ArgumentTypeCoercion */ + // Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('send %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending + ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) + ->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class) + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + SenderInterface $sender, + array $params, + ?Envelope $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception, [ + TraceAttributes::EXCEPTION_ESCAPED => true, + ]); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + + // Instrument the receiving of messages (consumer-side) + hook( + SenderInterface::class, + 'receive', + pre: static function ( + SenderInterface $bus, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + /** @var Envelope $envelope */ + $envelope = $params[0]; + $messageClass = \get_class($envelope->getMessage()); + + // Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('SEND %s', $messageClass)) - ->setSpanKind(SpanKind::KIND_PRODUCER) + ->spanBuilder(\sprintf('consume %s', $messageClass)) + ->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving ->setAttribute(TraceAttributes::CODE_FUNCTION, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINENO, $lineno) - ->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) - ; + ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass); $parent = Context::getCurrent(); - $span = $builder ->setParent($parent) ->startSpan(); $context = $span->storeInContext($parent); - Context::storage()->attach($context); return $params; diff --git a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php index 63d38849..347a88de 100644 --- a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php +++ b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php @@ -35,6 +35,7 @@ protected function getMessenger(): MessageBusInterface { return new MessageBus(); } + protected function getTransport() { // Symfony 6+ @@ -56,7 +57,6 @@ protected function getTransport() public function test_dispatch_message($message, string $spanName, int $kind, array $attributes) { $bus = $this->getMessenger(); - $bus->dispatch($message); $this->assertCount(1, $this->storage); @@ -99,6 +99,62 @@ public function test_send_message($message, string $spanName, int $kind, array $ } } + /** + * Test consumer span when processing a message + */ + public function test_consume_message() + { + $transport = $this->getTransport(); + $message = new SendEmailMessage('Hello Consumer'); + $envelope = new Envelope($message); + + // Simulate receiving the message via the transport + $transport->send($envelope); + + // Simulate message consumption (processing) + $bus = $this->getMessenger(); + $bus->dispatch($message); + + // After message is consumed, we expect a consumer span + $this->assertCount(1, $this->storage); + + /** @var ImmutableSpan $span */ + $span = $this->storage[0]; + + // We expect this to be a consumer span + $this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName()); + $this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind()); + + $this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE)); + $this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE)); + } + + /** + * @dataProvider sendDataProvider + * @param mixed $message + * @param string $spanName + * @param int $kind + * @param array $attributes + */ + public function test_send_message($message, string $spanName, int $kind, array $attributes) + { + $transport = $this->getTransport(); + $transport->send(new Envelope($message)); + + $this->assertCount(1, $this->storage); + + /** @var ImmutableSpan $span */ + $span = $this->storage[0]; + + $this->assertEquals($spanName, $span->getName()); + $this->assertEquals($kind, $span->getKind()); + + foreach ($attributes as $key => $value) { + $this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key)); + $this->assertEquals($value, $span->getAttributes()->get($key)); + } + } + public function test_can_sustain_throw_while_dispatching() { $bus = new class() implements MessageBusInterface {