Skip to content

Commit

Permalink
Laravel: message processing on the consumer side.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisLightfootWild committed Apr 8, 2024
1 parent 1a596ff commit 69117c8
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 55 deletions.
64 changes: 9 additions & 55 deletions src/Instrumentation/Laravel/src/Hooks/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@

use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue as AbstractQueue;
use Illuminate\Queue\RedisQueue;
use Illuminate\Queue\SqsQueue;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks\Queue\AttributesBuilder;
use function OpenTelemetry\Instrumentation\hook;
use OpenTelemetry\SemConv\TraceAttributes;
use OpenTelemetry\SemConv\TraceAttributeValues;
use Throwable;

class Queue
{
use AttributesBuilder;
use HookInstance;

public function instrument(): void
{
$this->hookAbstractQueueCreatePayloadArray();
$this->hookQueuePushRaw();

Queue\Worker::hook($this->instrumentation);
}

protected function hookAbstractQueueCreatePayloadArray(): bool
Expand All @@ -51,7 +54,10 @@ protected function hookQueuePushRaw(): bool
$parent = Context::getCurrent();
$span = $this->instrumentation
->tracer()
->spanBuilder("{$attributes[TraceAttributes::MESSAGING_DESTINATION_NAME]} publish")
->spanBuilder(vsprintf('%s %s', [
$attributes[TraceAttributes::MESSAGING_DESTINATION_NAME] ?? '(anonymous)',
TraceAttributeValues::MESSAGING_OPERATION_PUBLISH,
]))
->setSpanKind(SpanKind::KIND_PRODUCER)
->startSpan()
->setAttributes($attributes);
Expand All @@ -77,56 +83,4 @@ protected function hookQueuePushRaw(): bool
},
);
}

private function buildMessageAttributes(
QueueContract $queue,
string $rawPayload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
$payload = json_decode($rawPayload, true) ?? [];

return array_merge([
TraceAttributes::MESSAGING_DESTINATION_NAME => '(anonymous)',
TraceAttributes::MESSAGING_MESSAGE_ID => $payload['uuid'] ?? $payload['id'] ?? null,
TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($rawPayload),
'messaging.message.job_name' => $payload['displayName'] ?? $payload['job'] ?? null,
'messaging.message.attempts' => $payload['attempts'] ?? 0,
'messaging.message.max_exceptions' => $payload['maxExceptions'] ?? null,
'messaging.message.max_tries' => $payload['maxTries'] ?? null,
'messaging.message.retry_until' => $payload['retryUntil'] ?? null,
'messaging.message.timeout' => $payload['timeout'] ?? null,
], $this->contextualMessageSystemAttributes($queue, $payload, $queueName, $options, ...$params));
}

private function contextualMessageSystemAttributes(
QueueContract $queue,
array $payload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
return match (true) {
$queue instanceof RedisQueue => $this->redisContextualAttributes($queue, $payload, $queueName, $options, ...$params),
$queue instanceof SqsQueue => $this->awsSqsContextualAttributes($queue, $payload, $queueName, $options, ...$params),
default => [],
};
}

private function redisContextualAttributes(RedisQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => 'redis',
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}

private function awsSqsContextualAttributes(SqsQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => 'aws_sqs',
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}
}
65 changes: 65 additions & 0 deletions src/Instrumentation/Laravel/src/Hooks/Queue/AttributesBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks\Queue;

use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\RedisQueue;
use Illuminate\Queue\SqsQueue;
use OpenTelemetry\SemConv\TraceAttributes;
use OpenTelemetry\SemConv\TraceAttributeValues;

trait AttributesBuilder
{
private function buildMessageAttributes(
QueueContract $queue,
string $rawPayload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
$payload = json_decode($rawPayload, true) ?? [];

return array_merge([
TraceAttributes::MESSAGING_MESSAGE_ID => $payload['uuid'] ?? $payload['id'] ?? null,
TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($rawPayload),
'messaging.message.job_name' => $payload['displayName'] ?? $payload['job'] ?? null,
'messaging.message.attempts' => $payload['attempts'] ?? 0,
'messaging.message.max_exceptions' => $payload['maxExceptions'] ?? null,
'messaging.message.max_tries' => $payload['maxTries'] ?? null,
'messaging.message.retry_until' => $payload['retryUntil'] ?? null,
'messaging.message.timeout' => $payload['timeout'] ?? null,
], $this->contextualMessageSystemAttributes($queue, $payload, $queueName, $options, ...$params));
}

private function contextualMessageSystemAttributes(
QueueContract $queue,
array $payload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
return match (true) {
$queue instanceof RedisQueue => $this->redisContextualAttributes($queue, $payload, $queueName, $options, ...$params),
$queue instanceof SqsQueue => $this->awsSqsContextualAttributes($queue, $payload, $queueName, $options, ...$params),
default => [],
};
}

private function redisContextualAttributes(RedisQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => 'redis',
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}

private function awsSqsContextualAttributes(SqsQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => TraceAttributeValues::MESSAGING_SYSTEM_AWS_SQS,
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}
}
88 changes: 88 additions & 0 deletions src/Instrumentation/Laravel/src/Hooks/Queue/Worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks\Queue;

use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\Worker as QueueWorker;
use Illuminate\Queue\WorkerOptions;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks\HookInstance;
use function OpenTelemetry\Instrumentation\hook;
use OpenTelemetry\SemConv\TraceAttributes;
use Throwable;

class Worker
{
use AttributesBuilder;
use HookInstance;

public function instrument(): void
{
$this->hookWorkerProcess();
}

private function hookWorkerProcess(): bool
{
return hook(
QueueWorker::class,
'process',
pre: function (QueueWorker $worker, array $params, string $class, string $function, ?string $filename, ?int $lineno) {
$connectionName = (is_string($params[0] ?? null) ? $params[0] : null);
$job = ($params[1] instanceof Job ? $params[1] : null);
$workerOptions = ($params[2] instanceof WorkerOptions ? $params[2] : null);

$parent = TraceContextPropagator::getInstance()->extract(
$job?->payload() ?? [],
);

$queue = $worker->getManager()->connection($connectionName);
$attributes = $this->buildMessageAttributes($queue, $job->getRawBody(), $job->getQueue());

$span = $this->instrumentation
->tracer()
->spanBuilder(vsprintf('%s %s', [
$attributes[TraceAttributes::MESSAGING_DESTINATION_NAME] ?? '(anonymous)',
'process',
]))
->setSpanKind(SpanKind::KIND_CONSUMER)
->setParent($parent)
->startSpan()
->setAttributes($attributes);

Context::storage()->attach($span->storeInContext($parent));

return $params;
},
post: function (QueueWorker $worker, array $params, $returnValue, ?Throwable $exception) {
$scope = Context::storage()->scope();
if (!$scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception) {
$span->recordException($exception, [TraceAttributes::EXCEPTION_ESCAPED => true]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$connectionName = (is_string($params[0] ?? null) ? $params[0] : null);
$job = ($params[1] instanceof Job ? $params[1] : null);
$workerOptions = ($params[2] instanceof WorkerOptions ? $params[2] : null);

$span->setAttributes([
'messaging.message.deleted' => $job?->isDeleted(),
'messaging.message.released' => $job?->isReleased(),
]);

$span->end();
},
);
}
}

0 comments on commit 69117c8

Please sign in to comment.