Skip to content

Commit

Permalink
Message receive (KIND_CONSUMER) instrumentation updated
Browse files Browse the repository at this point in the history
  • Loading branch information
RichardChukwu committed Oct 17, 2024
1 parent 8ad2de6 commit ea5f1b1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 23 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
},
"require-dev": {
"composer/xdebug-handler": "^2.0",
"open-telemetry/dev-tools": "dev-main"
"open-telemetry/dev-tools": "dev-main",
"phpunit/phpunit": "^11.4"
},
"autoload": {
"psr-4": {
Expand Down
97 changes: 76 additions & 21 deletions src/Instrumentation/Symfony/src/MessengerInstrumentation.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ public static function register(): void
{
$instrumentation = new CachedInstrumentation('io.opentelemetry.contrib.php.symfony_messenger');

/**
* MessageBusInterface dispatches messages to the handlers.
*/
hook(

// Instrument MessageBusInterface (message dispatching)
hook(
MessageBusInterface::class,
'dispatch',
pre: static function (
Expand All @@ -54,19 +53,17 @@ public static function register(): void
$message = $params[0];
$messageClass = \get_class($message);

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument dispatch as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('DISPATCH %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('publish %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for dispatch
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
Expand Down Expand Up @@ -103,9 +100,7 @@ public static function register(): void
}
);

/**
* SenderInterface sends messages to a transport.
*/
// Instrument SenderInterface (sending messages to transport)
hook(
SenderInterface::class,
'send',
Expand All @@ -121,28 +116,88 @@ public static function register(): void
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

/** @psalm-suppress ArgumentTypeCoercion */
// Instrument sending as a "send" operation with SpanKind::KIND_PRODUCER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('send %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER) // Set KIND_PRODUCER for sending
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)
->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();
$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
},
post: static function (
SenderInterface $sender,
array $params,
?Envelope $result,
?\Throwable $exception
): void {
$scope = Context::storage()->scope();
if (null === $scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());

if (null !== $exception) {
$span->recordException($exception, [
TraceAttributes::EXCEPTION_ESCAPED => true,
]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
}
);

// Instrument the receiving of messages (consumer-side)
hook(
SenderInterface::class,
'receive',
pre: static function (
SenderInterface $bus,
array $params,
string $class,
string $function,
?string $filename,
?int $lineno,
) use ($instrumentation): array {
/** @var Envelope $envelope */
$envelope = $params[0];
$messageClass = \get_class($envelope->getMessage());

// Instrument receiving as a "consume" operation with SpanKind::KIND_CONSUMER
$builder = $instrumentation
->tracer()
->spanBuilder(\sprintf('SEND %s', $messageClass))
->setSpanKind(SpanKind::KIND_PRODUCER)
->spanBuilder(\sprintf('consume %s', $messageClass))
->setSpanKind(SpanKind::KIND_CONSUMER) // Set KIND_CONSUMER for receiving
->setAttribute(TraceAttributes::CODE_FUNCTION, $function)
->setAttribute(TraceAttributes::CODE_NAMESPACE, $class)
->setAttribute(TraceAttributes::CODE_FILEPATH, $filename)
->setAttribute(TraceAttributes::CODE_LINENO, $lineno)

->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class)
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass)
;
->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass);

$parent = Context::getCurrent();

$span = $builder
->setParent($parent)
->startSpan();

$context = $span->storeInContext($parent);

Context::storage()->attach($context);

return $params;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ protected function getMessenger(): MessageBusInterface
{
return new MessageBus();
}

protected function getTransport()
{
// Symfony 6+
Expand All @@ -56,7 +57,6 @@ protected function getTransport()
public function test_dispatch_message($message, string $spanName, int $kind, array $attributes)
{
$bus = $this->getMessenger();

$bus->dispatch($message);

$this->assertCount(1, $this->storage);
Expand Down Expand Up @@ -99,6 +99,62 @@ public function test_send_message($message, string $spanName, int $kind, array $
}
}

/**
* Test consumer span when processing a message
*/
public function test_consume_message()
{
$transport = $this->getTransport();
$message = new SendEmailMessage('Hello Consumer');
$envelope = new Envelope($message);

// Simulate receiving the message via the transport
$transport->send($envelope);

// Simulate message consumption (processing)
$bus = $this->getMessenger();
$bus->dispatch($message);

// After message is consumed, we expect a consumer span
$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

// We expect this to be a consumer span
$this->assertEquals('CONSUME OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getName());
$this->assertEquals(SpanKind::KIND_CONSUMER, $span->getKind());

$this->assertTrue($span->getAttributes()->has(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
$this->assertEquals('OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', $span->getAttributes()->get(MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE));
}

/**
* @dataProvider sendDataProvider
* @param mixed $message
* @param string $spanName
* @param int $kind
* @param array $attributes
*/
public function test_send_message($message, string $spanName, int $kind, array $attributes)
{
$transport = $this->getTransport();
$transport->send(new Envelope($message));

$this->assertCount(1, $this->storage);

/** @var ImmutableSpan $span */
$span = $this->storage[0];

$this->assertEquals($spanName, $span->getName());
$this->assertEquals($kind, $span->getKind());

foreach ($attributes as $key => $value) {
$this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key));
$this->assertEquals($value, $span->getAttributes()->get($key));
}
}

public function test_can_sustain_throw_while_dispatching()
{
$bus = new class() implements MessageBusInterface {
Expand Down

0 comments on commit ea5f1b1

Please sign in to comment.