Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Laravel: tracing Queues (+ hook refactoring) #250

Merged
merged 51 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
d76534d
Laravel: start of QueueWatcher.
ChrisLightfootWild Apr 2, 2024
a9864a3
Laravel: PR feedback.
ChrisLightfootWild Apr 3, 2024
71d999c
Laravel: moved additional attributes from event into span.
ChrisLightfootWild Apr 3, 2024
1aba286
Merge remote-tracking branch 'upstream/main' into laravel/queues
ChrisLightfootWild Apr 4, 2024
493e2d2
Laravel: moved propagation to root of message.
ChrisLightfootWild Apr 6, 2024
4498319
Laravel: experimenting with hooking \Illuminate\Contracts\Queue\Queue.
ChrisLightfootWild Apr 7, 2024
29330a1
Laravel: added queue context propagation via hook.
ChrisLightfootWild Apr 7, 2024
0ad58be
Update src/Instrumentation/Laravel/src/Hooks/Queue.php
ChrisLightfootWild Apr 8, 2024
1a596ff
Laravel: fix post-hook return. Thanks @brettmc
ChrisLightfootWild Apr 8, 2024
69117c8
Laravel: message processing on the consumer side.
ChrisLightfootWild Apr 8, 2024
17ad05f
Laravel: refining queue hooks.
ChrisLightfootWild Apr 9, 2024
b2ed01b
Laravel: removed the QueueWatcher in favour of hook.
ChrisLightfootWild Apr 10, 2024
550ddfb
Laravel: hook restructuring.
ChrisLightfootWild Apr 10, 2024
a220d7b
Merge remote-tracking branch 'upstream/main' into laravel/queues
ChrisLightfootWild Apr 10, 2024
f9d11fe
Laravel: moved ServeCommand hook.
ChrisLightfootWild Apr 10, 2024
29af9cb
Laravel: hook SyncQueue.
ChrisLightfootWild Apr 13, 2024
0a564e1
Laravel: moved main Application hook.
ChrisLightfootWild Apr 14, 2024
db1749c
Laravel: queue cleanup.
ChrisLightfootWild Apr 14, 2024
c734b78
Laravel: test queue tracing.
ChrisLightfootWild Apr 14, 2024
59f173c
Laravel: SyncQueue hook linting.
ChrisLightfootWild Apr 14, 2024
58667a2
Laravel: rebuilt docker image and newer otel-instrumentation fixed this.
ChrisLightfootWild Apr 14, 2024
9141441
Laravel: update QueueTest.
ChrisLightfootWild Apr 14, 2024
8a06b74
Laravel: hook Queue bulk & later.
ChrisLightfootWild Apr 14, 2024
57edcd9
Laravel: added estimate delivery_timestamp when passed to Queue->late…
ChrisLightfootWild Apr 14, 2024
e939785
Laravel: fixing psalm errors.
ChrisLightfootWild Apr 15, 2024
8654e59
Laravel: fixing phpstan errors.
ChrisLightfootWild Apr 15, 2024
7f6f028
Laravel: refactoring QueueTest.
ChrisLightfootWild Apr 15, 2024
1cf42cf
Laravel: added Queue->later tests.
ChrisLightfootWild Apr 15, 2024
a5a7262
Laravel: fix Queue->later tests.
ChrisLightfootWild Apr 16, 2024
3de767d
Laravel: added beanstalk message system detection.
ChrisLightfootWild Apr 16, 2024
58f5847
Laravel: added bulk message dispatch test.
ChrisLightfootWild Apr 16, 2024
406222c
Laravel: added redis connection test.
ChrisLightfootWild Apr 16, 2024
4c042b9
Laravel: record a span if a job is received from the queue.
ChrisLightfootWild Apr 16, 2024
c7bbe1a
Laravel: PR feedback to address closing incorrect span.
ChrisLightfootWild Apr 17, 2024
4086980
Laravel: PR feedback - change confusing ClockInterface mock.
ChrisLightfootWild Apr 19, 2024
f67e089
Merge remote-tracking branch 'upstream/main' into laravel/queues
ChrisLightfootWild Apr 19, 2024
6f3aa89
Laravel: prefer stable dependencies with lowest package compat.
ChrisLightfootWild Apr 19, 2024
9c1eebe
Laravel: fix dependencies for use with prefer-lowest.
ChrisLightfootWild Apr 19, 2024
52e30ab
Laravel: fix lowest dependencies.
ChrisLightfootWild Apr 21, 2024
5eac03b
Laravel: fix watcher PhanTypeArraySuspicious warnings.
ChrisLightfootWild Apr 21, 2024
1ef619b
Laravel: removed ClockInterface usage to avoid dependency on SDK.
ChrisLightfootWild Apr 21, 2024
84a4ea5
Laravel: more fixes for phan.
ChrisLightfootWild Apr 21, 2024
33b4cfa
Laravel: remove erroneous Context destroy usage.
ChrisLightfootWild Apr 21, 2024
e8a3ca1
Laravel: added interface LaravelHook.
ChrisLightfootWild Apr 21, 2024
75df94b
Laravel: suppress psalm warning.
ChrisLightfootWild Apr 21, 2024
6a83396
Laravel: moved Console/Kernel/Command hooks.
ChrisLightfootWild Apr 21, 2024
47e1ede
Laravel: export-ignore /.phan.
ChrisLightfootWild Apr 21, 2024
f4ea0ce
Laravel: explicitly list support for Laravel versions 6 to 11.
ChrisLightfootWild Apr 22, 2024
92a6325
Laravel: moved Http Kernel hooks.
ChrisLightfootWild Apr 22, 2024
ae2eeda
Laravel: consistent test naming.
ChrisLightfootWild Apr 22, 2024
a3e9dba
Laravel: test dropped empty queue receives.
ChrisLightfootWild Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Instrumentation/Laravel/src/Hooks/AbstractHook.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks;

use OpenTelemetry\API\Instrumentation\CachedInstrumentation;

abstract class AbstractHook
{
private static ?self $instance = null;

protected function __construct(
protected CachedInstrumentation $instrumentation,
) {
}

abstract public function instrument(): void;

public static function hook(CachedInstrumentation $instrumentation): static
{
if (static::$instance === null) {
static::$instance = new static($instrumentation);
static::$instance->instrument();
}

return static::$instance;
}
}
137 changes: 137 additions & 0 deletions src/Instrumentation/Laravel/src/Hooks/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Hooks;

use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue as AbstractQueue;
use Illuminate\Queue\RedisQueue;
use Illuminate\Queue\SqsQueue;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
use OpenTelemetry\Context\Context;
use function OpenTelemetry\Instrumentation\hook;
use OpenTelemetry\SemConv\TraceAttributes;
use Throwable;

class Queue extends AbstractHook
{
public function instrument(): void
{
$this->hookAbstractQueueCreatePayloadArray();
$this->hookQueuePushRaw();
}

protected function hookAbstractQueueCreatePayloadArray(): bool
{
// @todo: remove once post-hook return value works.
AbstractQueue::createPayloadUsing(function () {
$carrier = [];
TraceContextPropagator::getInstance()->inject($carrier);

return $carrier;
});

return hook(
AbstractQueue::class,
'createPayloadArray',
post: function (AbstractQueue $queue, array $params, array $payload, ?Throwable $exception) {
ChrisLightfootWild marked this conversation as resolved.
Show resolved Hide resolved
TraceContextPropagator::getInstance()->inject($payload);
return $payload;
},
);
}

protected function hookQueuePushRaw(): bool
{
return hook(
QueueContract::class,
'pushRaw',
pre: function (QueueContract $queue, array $params, string $class, string $function, ?string $filename, ?int $lineno) {
$attributes = $this->buildMessageAttributes($queue, ...$params);

$parent = Context::getCurrent();
$span = $this->instrumentation
->tracer()
->spanBuilder("{$attributes[TraceAttributes::MESSAGING_DESTINATION_NAME]} publish")
->setSpanKind(SpanKind::KIND_PRODUCER)
->startSpan()
->setAttributes($attributes);

Context::storage()->attach($span->storeInContext($parent));

return $params;
},
post: function (QueueContract $queue, array $params, $returnValue, ?Throwable $exception) {
$scope = Context::storage()->scope();
if (!$scope) {
return;
}

$scope->detach();
$span = Span::fromContext($scope->context());
if ($exception) {
$span->recordException($exception, [TraceAttributes::EXCEPTION_ESCAPED => true]);
$span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage());
}

$span->end();
},
);
}

private function buildMessageAttributes(
QueueContract $queue,
string $rawPayload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
$payload = json_decode($rawPayload, true) ?? [];

return array_merge([
TraceAttributes::MESSAGING_DESTINATION_NAME => '(anonymous)',
TraceAttributes::MESSAGING_MESSAGE_ID => $payload['uuid'] ?? $payload['id'] ?? null,
TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($rawPayload),
'messaging.message.job_name' => $payload['displayName'] ?? $payload['job'] ?? null,
'messaging.message.attempts' => $payload['attempts'] ?? 0,
'messaging.message.max_exceptions' => $payload['maxExceptions'] ?? null,
'messaging.message.max_tries' => $payload['maxTries'] ?? null,
'messaging.message.retry_until' => $payload['retryUntil'] ?? null,
'messaging.message.timeout' => $payload['timeout'] ?? null,
], $this->contextualMessageSystemAttributes($queue, $payload, $queueName, $options, ...$params));
}

private function contextualMessageSystemAttributes(
QueueContract $queue,
array $payload,
string $queueName = null,
array $options = [],
mixed ...$params,
): array {
return match (true) {
$queue instanceof RedisQueue => $this->redisContextualAttributes($queue, $payload, $queueName, $options, ...$params),
$queue instanceof SqsQueue => $this->awsSqsContextualAttributes($queue, $payload, $queueName, $options, ...$params),
default => [],
};
}

private function redisContextualAttributes(RedisQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => 'redis',
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}

private function awsSqsContextualAttributes(SqsQueue $queue, array $payload, string $queueName = null, array $options = [], mixed ...$params): array
{
return [
TraceAttributes::MESSAGING_SYSTEM => 'aws_sqs',
TraceAttributes::MESSAGING_DESTINATION_NAME => $queue->getQueue($queueName),
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static function register(): void

ConsoleInstrumentation::register($instrumentation);
HttpInstrumentation::register($instrumentation);
Hooks\Queue::hook($instrumentation);

self::developmentInstrumentation();
}
Expand Down
86 changes: 86 additions & 0 deletions src/Instrumentation/Laravel/src/Watchers/QueueWatcher.php
ChrisLightfootWild marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\Contrib\Instrumentation\Laravel\Watchers;

use Illuminate\Contracts\Foundation\Application;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Queue;
use OpenTelemetry\API\Instrumentation\CachedInstrumentation;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
use OpenTelemetry\API\Trace\Span;
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\Context\Context;
use OpenTelemetry\SemConv\TraceAttributes;

class QueueWatcher extends Watcher
{
public function __construct(
private CachedInstrumentation $instrumentation,
) {
}

/** @psalm-suppress UndefinedInterfaceMethod */
public function register(Application $app): void
{
Queue::createPayloadUsing(function () {
$carrier = [];
TraceContextPropagator::getInstance()->inject($carrier);

return $carrier;
});

$app['events']->listen(JobProcessing::class, [$this, 'handleJobProcessing']);
$app['events']->listen(JobProcessed::class, [$this, 'handleJobProcessed']);
}

public function handleJobProcessing(JobProcessing $jobProcessing): void
{
$parent = TraceContextPropagator::getInstance()->extract(
$jobProcessing->job->payload(),
);

$span = $this->instrumentation
->tracer()
->spanBuilder('queue')
->setSpanKind(SpanKind::KIND_CONSUMER)
->setParent($parent)
->startSpan();

$span->setAttributes([
TraceAttributes::MESSAGING_SYSTEM => $jobProcessing->job->getConnectionName(),
TraceAttributes::MESSAGING_DESTINATION_NAME => $jobProcessing->job->getQueue(),
TraceAttributes::MESSAGING_MESSAGE_ID => $jobProcessing->job->uuid(),
TraceAttributes::MESSAGING_MESSAGE_ENVELOPE_SIZE => strlen($jobProcessing->job->getRawBody()),
TraceAttributes::MESSAGING_MESSAGE_CONVERSATION_ID => $jobProcessing->job->getJobId(),
// Existing sem-conv might not provide for the below, but hopefully this is permissible for now.
'messaging.message.job_name' => $jobProcessing->job->resolveName(),
'messaging.message.attempts' => $jobProcessing->job->attempts(),
'messaging.message.max_exceptions' => $jobProcessing->job->maxExceptions(),
'messaging.message.max_tries' => $jobProcessing->job->maxTries(),
'messaging.message.retry_until' => $jobProcessing->job->retryUntil(),
'messaging.message.timeout' => $jobProcessing->job->timeout(),
]);

Context::storage()->attach($span->storeInContext($parent));
}

public function handleJobProcessed(JobProcessed $jobProcessed): void
{
$scope = Context::storage()->scope();
if (!$scope) {
return;
}
$scope->detach();
$span = Span::fromContext($scope->context());

$span->setAttributes([
'messaging.message.deleted' => $jobProcessed->job->isDeleted(),
'messaging.message.released' => $jobProcessed->job->isReleased(),
]);

$span->end();
}
}
Loading