Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIND::CONSUMER instrumentation implemented #307

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"require-dev": {
"composer/xdebug-handler": "^2.0",
"open-telemetry/dev-tools": "dev-main"
"open-telemetry/dev-tools": "dev-main",
"phpunit/phpunit": "^11.4"
RichardChukwu marked this conversation as resolved.
Show resolved Hide resolved
},
"autoload": {
"psr-4": {
Expand Down
99 changes: 84 additions & 15 deletions src/Instrumentation/Symfony/src/MessengerInstrumentation.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public static function register(): void
{
$instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger');

/**
* MessageBusInterface dispatches messages to the handlers.
*/
hook(

// Instrument MessageBusInterface (message dispatching)
hook(
MessageBusInterface::class,
'dispatch',
pre: static function (
Expand All @@ -54,19 +53,17 @@ public static function register(): void
$message = $params[0];
$messageClass = \get_class($message);

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('DISPATCH %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('publish %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
Expand Down Expand Up @@ -103,9 +100,7 @@ public static function register(): void
}
);

/**
* SenderInterface sends messages to a transport.
*/
// Instrument SenderInterface (sending messages to transport)
hook(
SenderInterface::class,
'send',
Expand All @@ -121,8 +116,19 @@ public static function register(): void
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
<<<<<<< HEAD
RichardChukwu marked this conversation as resolved.
Show resolved Hide resolved
->tracer()
->spanBuilder(\sprintf('send %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);
=======
->tracer()
->spanBuilder(\sprintf('SEND %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
Expand All @@ -134,9 +140,9 @@ public static function register(): void
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
>>>>>>> 78a04cebaeba48d60a00dc1c48653695b926299d

$parent = Context::getCurrent();

$span = $builder
->setParent($parent)
->startSpan();
Expand Down Expand Up @@ -168,6 +174,69 @@ public static function register(): void
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);

// Instrument the receiving of messages (consumer-side)
hook(
SenderInterface::class,
RichardChukwu marked this conversation as resolved.
Show resolved Hide resolved
'receive',
pre: static function (
SenderInterface $bus,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
/** @var Envelope $envelope */
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

// Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('consume %s', $messageClass))
->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);
Context::storage()->attach($context);

return $params;
},
post: static function (
SenderInterface $sender,
array $params,
?Envelope $result,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ protected function getMessenger(): MessageBusInterface
{
return new MessageBus();
}

protected function getTransport()
{
// Symfony 6+
Expand All @@ -56,7 +57,6 @@ protected function getTransport()
public function test_dispatch_message($message, string $spanName, int $kind, array $attributes)
{
$bus = $this->getMessenger();

$bus->dispatch($message);

$this->assertCount(1, $this->storage);
Expand Down Expand Up @@ -99,6 +99,62 @@ public function test_send_message($message, string $spanName, int $kind, array $
}
}

/**
* Test consumer span when processing a message
*/
public function test_consume_message()
{
$transport = $this->getTransport();
$message = new SendEmailMessage('Hello Consumer');
$envelope = new Envelope($message);

// Simulate receiving the message via the transport
$transport->send($envelope);

// Simulate message consumption (processing)
$bus = $this->getMessenger();
$bus->dispatch($message);

// After message is consumed, we expect a consumer span
$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

// We expect this to be a consumer span
$this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName());
$this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind());

$this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
$this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
}

/**
* @dataProvider sendDataProvider
* @param mixed $message
* @param string $spanName
* @param int $kind
* @param array $attributes
*/
public function test_send_message($message, string $spanName, int $kind, array $attributes)
{
$transport = $this->getTransport();
$transport->send(new Envelope($message));

$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

$this->assertEquals($spanName, $span->getName());
$this->assertEquals($kind, $span->getKind());

foreach ($attributes as $key => $value) {
$this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key));
$this->assertEquals($value, $span->getAttributes()->get($key));
}
}

public function test_can_sustain_throw_while_dispatching()
{
$bus = new class() implements MessageBusInterface {
Expand Down
Loading