Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(active_job): Use ActiveSupport instead of patches #677

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
55e11e3
feat!(active_job): Use ActiveSupport instead of patches
arielvalentin Sep 28, 2023
44cb141
refactor: Register discard job
arielvalentin Oct 8, 2023
9e20af7
refactor: independent registration
arielvalentin Oct 8, 2023
76dd2a3
refactor: Use template methods for children
arielvalentin Oct 8, 2023
f710452
refactor: some more small fixes
arielvalentin Oct 8, 2023
d3083b2
refactor: a little clean up
arielvalentin Oct 8, 2023
9042cdf
refactor: a little more clarity
arielvalentin Oct 8, 2023
2ae1184
refactor: try to make sense of a few things
arielvalentin Oct 8, 2023
f79e1e8
refactor: fix test directory structure
arielvalentin Oct 8, 2023
918c919
refactor: extract to files
arielvalentin Oct 9, 2023
92dad7a
add some docs
arielvalentin Oct 9, 2023
0bad18c
refactor: clean up and renames
arielvalentin Oct 9, 2023
dd297ef
refactor: split tests out per handler
arielvalentin Oct 9, 2023
b433aa7
refactor... maybe a little overengineering
arielvalentin Oct 10, 2023
e88c476
fix: add examples of in-line instrumentation
arielvalentin Oct 11, 2023
10021d1
Update README.md
arielvalentin Oct 14, 2023
45019ae
Update README.md
arielvalentin Oct 14, 2023
2b9078b
Update default.rb
arielvalentin Oct 14, 2023
a345fa2
Update perform.rb
arielvalentin Oct 14, 2023
57e956e
squash: Use Singleton Tracer
arielvalentin Nov 1, 2023
8329bb5
Merge branch 'main' into active-job-instrumentation-redux
arielvalentin Nov 7, 2023
6d2fbf8
fix: Use top level namespaces
arielvalentin Nov 9, 2023
3a56bdb
Merge branch 'main' into active-job-instrumentation-redux
arielvalentin Nov 9, 2023
fe751b7
Merge branch 'main' into active-job-instrumentation-redux
arielvalentin Nov 19, 2023
3adaf59
squash: Update instrumentation/active_job/README.md
arielvalentin Nov 20, 2023
0eecaa5
squash: Update docs
arielvalentin Nov 21, 2023
6a6713f
squash: Reduce Local Variable Allocation
arielvalentin Nov 21, 2023
d6bccf3
squash: Safe navigation no longer required
arielvalentin Nov 21, 2023
c7f84b8
squash: fix bad commit
arielvalentin Nov 21, 2023
8237302
squash: Remove possible PII from status message
arielvalentin Nov 21, 2023
0287169
squash: PR Feedback
arielvalentin Nov 21, 2023
c0655eb
squash: Use up to date semconv and remove unused attrs
arielvalentin Nov 21, 2023
50ac4b5
squash: remove unused file
arielvalentin Nov 21, 2023
e3e096c
squash: fix test
arielvalentin Nov 21, 2023
071dc7a
squash: fix messaging id
arielvalentin Nov 21, 2023
7639556
squash: More semconv fixes
arielvalentin Nov 21, 2023
11799a9
squash: fix ids
arielvalentin Nov 21, 2023
7235421
squash: feedback from Ruby SIG
arielvalentin Nov 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions instrumentation/active_job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,64 @@ OpenTelemetry::SDK.configure do |c|
end
```

## Active Support Instrumentation

Earlier versions of this instrumentation relied on registering custom `around_perform` hooks in order to deal with limitations
in `ActiveSupport::Notifications`, however those patches resulted in error reports and inconsistent behavior when combined with other gems.

This instrumentation now relies entirely on `ActiveSupport::Notifications` and registers a custom Subscriber that listens to relevant events to report as spans.

See the table below for details of what [Rails Framework Hook Events](https://guides.rubyonrails.org/active_support_instrumentation.html#active-job) are recorded by this instrumentation:

| Event Name | Creates Span? | Notes |
| - | - | - |
| `enqueue_at.active_job` | :white_check_mark: | Creates an egress span with kind `producer` |
| `enqueue.active_job` | :white_check_mark: | Creates an egress span with kind `producer` |
| `enqueue_retry.active_job` | :white_check_mark: | Creates an `internal` span |
| `perform_start.active_job` | :x: | This is invoked prior to the appropriate ingress point and is therefore ignored |
| `perform.active_job` | :white_check_mark: | Creates an ingress span with kind `consumer` |
| `retry_stopped.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event |
| `discard.active_job` | :white_check_mark: | Creates and `internal` span with an `exception` event |

## Semantic Conventions

This instrumentation generally uses [Messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) by treating job enqueuers as `producers` and workers as `consumers`.

Internal spans are named using the name of the `ActiveSupport` event that was provided.

Attributes that are specific to this instrumentation are recorded under `messaging.active_job.*`:

| Attribute Name | Type | Notes |
robbkidd marked this conversation as resolved.
Show resolved Hide resolved
| - | - | - |
| `code.namespace` | String | `ActiveJob` class name |
| `messaging.system` | String | Static value set to `active_job` |
| `messaging.destination` | String | Set from `ActiveJob#queue_name` |
| `messaging.message.id` | String | Set from `ActiveJob#job_id` |
| `messaging.active_job.adapter.name` | String | The name of the `ActiveJob` adapter implementation |
| `messaging.active_job.message.priority` | String | Present when set by the client from `ActiveJob#priority` |
| `messaging.active_job.message.provider_job_id` | String | Present if the underlying adapter has backend specific message ids |

## Differences between ActiveJob versions

### ActiveJob 6.1

`perform.active_job` events do not include timings for `ActiveJob` callbacks therefore time spent in `before` and `after` hooks will be missing

### ActiveJob 7+

`perform.active_job` no longer includes exceptions handled using `rescue_from` in the payload.

In order to preserve this behavior you will have to update the span yourself, e.g.

```ruby
rescue_from MyCustomError do |e|
# Custom code to handle the error
span = OpenTelemetry::Instrumentation::ActiveJob.current_span
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error('Job failed')
end
```

## Examples

Example usage can be seen in the `./example/active_job.rb` file [here](https://github.com/open-telemetry/opentelemetry-ruby-contrib/blob/main/instrumentation/active_job/example/active_job.rb)
Expand Down
8 changes: 0 additions & 8 deletions instrumentation/active_job/example/Gemfile

This file was deleted.

99 changes: 92 additions & 7 deletions instrumentation/active_job/example/active_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,115 @@
#
# SPDX-License-Identifier: Apache-2.0

ENV['OTEL_SERVICE_NAME'] ||= 'otel-active-job-demo'
require 'rubygems'
require 'bundler/setup'
require 'active_job'
require 'bundler/inline'

Bundler.require
gemfile do
source 'https://rubygems.org'
gem 'activejob', '~> 7.0.0', require: 'active_job'
gem 'opentelemetry-instrumentation-active_job', path: '../'
gem 'opentelemetry-sdk'
gem 'opentelemetry-exporter-otlp'
end

ENV['OTEL_LOG_LEVEL'] ||= 'fatal'
ENV['OTEL_TRACES_EXPORTER'] ||= 'console'
OpenTelemetry::SDK.configure do |c|
c.use 'OpenTelemetry::Instrumentation::ActiveJob'
at_exit { OpenTelemetry.tracer_provider.shutdown }
end

class FailingJob < ::ActiveJob::Base
queue_as :demo
def perform
raise 'this job failed'
end
end

class FailingRetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 2, wait: 0
def perform
raise 'this job failed'
end
end

class RetryJob < ::ActiveJob::Base
queue_as :demo

retry_on StandardError, attempts: 3, wait: 0
def perform
if executions < 3
raise 'this job failed'
else
puts <<~EOS

--------------------------------------------------
Done Retrying!
--------------------------------------------------

EOS
end
end
end

class DiscardJob < ::ActiveJob::Base
queue_as :demo

class DiscardError < StandardError; end

discard_on DiscardError

def perform
raise DiscardError, 'this job failed'
end
end

EXAMPLE_TRACER = OpenTelemetry.tracer_provider.tracer('activejob-example', '1.0')

class TestJob < ::ActiveJob::Base
def perform
puts <<~EOS
EXAMPLE_TRACER.in_span("custom span") do
puts <<~EOS

--------------------------------------------------
The computer is doing some work, beep beep boop.
--------------------------------------------------

EOS
end
end
end

class DoItNowJob < ::ActiveJob::Base
def perform
$stderr.puts <<~EOS

--------------------------------------------------
The computer is doing some work, beep beep boop.
Called with perform_now!
--------------------------------------------------

EOS
end
end

class BatchJob < ::ActiveJob::Base
def perform
TestJob.perform_later
FailingJob.perform_later
FailingRetryJob.perform_later
RetryJob.perform_later
DiscardJob.perform_later
end
end

::ActiveJob::Base.queue_adapter = :async

TestJob.perform_later
sleep 0.1 # To ensure we see both spans!
EXAMPLE_TRACER.in_span('run-jobs') do
DoItNowJob.perform_now
BatchJob.perform_later
end

sleep 5 # allow the job to complete
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,39 @@ module OpenTelemetry
module Instrumentation
# Contains the OpenTelemetry instrumentation for the ActiveJob gem
module ActiveJob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a lot of duplication between this file and the Rack instrumentation file. It might be a good future candidate for a helper gem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

I was of thinking of making a generic "ingress" span Context that would be instrumentation agnostic that way any child span is able to access the ingress span and enrich it.

So instead of saying Rack.current_span or ActiveJob.current_span it would be Ingress.current_span.

extend self

CURRENT_SPAN_KEY = Context.create_key('current-span')
private_constant :CURRENT_SPAN_KEY

# Returns the current span from the current or provided context
#
# @param [optional Context] context The context to lookup the current
# {Span} from. Defaults to Context.current
def current_span(context = nil)
context ||= Context.current
context.value(CURRENT_SPAN_KEY) || OpenTelemetry::Trace::Span::INVALID
end

# Returns a context containing the span, derived from the optional parent
# context, or the current context if one was not provided.
#
# @param [optional Context] context The context to use as the parent for
# the returned context
def context_with_span(span, parent_context: Context.current)
parent_context.set_value(CURRENT_SPAN_KEY, span)
end

# Activates/deactivates the Span within the current Context, which makes the "current span"
# available implicitly.
#
# On exit, the Span that was active before calling this method will be reactivated.
#
# @param [Span] span the span to activate
# @yield [span, context] yields span and a context containing the span to the block.
def with_span(span)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is never called and that the context is never set.

This is causing link propagation to fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any additional details you would like to share in an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go! #1131

I was working on that, but it took me longer than expected to write up the issue. 😅

Context.with_value(CURRENT_SPAN_KEY, span) { |c, s| yield s, c }
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'mappers/attribute'
require_relative 'handlers/default'
require_relative 'handlers/enqueue'
require_relative 'handlers/perform'

module OpenTelemetry
module Instrumentation
module ActiveJob
# Module that contains custom event handlers, which are used to generate spans per event
module Handlers
module_function

# Subscribes Event Handlers to relevant ActiveJob notifications
#
# The following events are recorded as spans:
# - enqueue
# - enqueue_at
# - enqueue_retry
# - perform
# - retry_stopped
# - discard
#
# Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span,
# while internal spans keep their ActiveSupport event name.
#
# @note this method is not thread safe and should not be used in a multi-threaded context
# @note Why no perform_start?
# This event causes much heartache as it is the first in a series of events that is triggered.
# It should not be the ingress span because it does not measure anything.
# https://github.com/rails/rails/blob/v6.1.7.6/activejob/lib/active_job/instrumentation.rb#L14
# https://github.com/rails/rails/blob/v7.0.8/activejob/lib/active_job/instrumentation.rb#L19
def subscribe
return unless Array(@subscriptions).empty?

mapper = Mappers::Attribute.new
config = ActiveJob::Instrumentation.instance.config
parent_span_provider = OpenTelemetry::Instrumentation::ActiveJob

# TODO, use delegation instead of inheritance
default_handler = Handlers::Default.new(parent_span_provider, mapper, config)
enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config)
perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config)

handlers_by_pattern = {
'enqueue' => enqueue_handler,
'enqueue_at' => enqueue_handler,
'enqueue_retry' => default_handler,
'perform' => perform_handler,
'retry_stopped' => default_handler,
'discard' => default_handler
}

@subscriptions = handlers_by_pattern.map do |key, handler|
::ActiveSupport::Notifications.subscribe("#{key}.active_job", handler)
end
end

# Removes Event Handler Subscriptions for ActiveJob notifications
# @note this method is not thread-safe and should not be used in a multi-threaded context
def unsubscribe
@subscriptions&.each { |subscriber| ActiveSupport::Notifications.unsubscribe(subscriber) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be ::ActiveSupport::Notifications, otherwise it throws:

uninitialized constant OpenTelemetry::Instrumentation::ActiveSupport::Notifications

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, would you be able to provide us with a PR to address this problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@subscriptions = nil
end
end
end
end
end
Loading