Skip to content

Commit

Permalink
refactor(active_job): Use ActiveSupport instead of patches
Browse files Browse the repository at this point in the history
  • Loading branch information
arielvalentin committed Sep 28, 2023
1 parent d89b68e commit 4c659cc
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ def gem_version

def require_dependencies
require_relative 'patches/base'
require_relative 'patches/active_job_callbacks'
require_relative 'subscriber'
end

def patch_activejob
::ActiveJob::Base.prepend(Patches::Base)
::ActiveJob::Base.prepend(Patches::ActiveJobCallbacks)
::ActiveJob::Base.prepend(Patches::Base) unless ::ActiveJob::Base.ancestors.include?(Patches::Base)
Subscriber.install
end
end
end
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,33 @@ module Patches
module Base
def self.prepended(base)
base.class_eval do
attr_accessor :metadata
attr_accessor :__otel_headers
end
end

def initialize(*args)
@metadata = {}
def initialize(...)
@__otel_headers = {}
super
end
ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)

def serialize
super.merge('metadata' => serialize_arguments(metadata))
message = super

begin
message.merge!('__otel_headers' => serialize_arguments(@__otel_headers))
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

message
end

def deserialize(job_data)
self.metadata = deserialize_arguments(job_data['metadata'] || []).to_h
begin
@__otel_headers = deserialize_arguments(job_data.delete('__otel_headers') || []).to_h
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
super
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# frozen_string_literal: true

require 'active_support/subscriber'

module OpenTelemetry
module Instrumentation
module ActiveJob
# Provides helper methods
#
module AttributeProcessor
def to_otel_semconv_attributes(job)
test_adapters = %w[async inline]

otel_attributes = {
'code.namespace' => job.class.name,
'messaging.destination_kind' => 'queue',
'messaging.system' => job.class.queue_adapter_name,
'messaging.destination' => job.queue_name,
'messaging.message_id' => job.job_id,
'rails.active_job.execution.counter' => job.executions + 1,
'rails.active_job.provider_job_id' => job.provider_job_id,
'rails.active_job.priority' => job.priority
}

otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name)
otel_attributes.compact!

otel_attributes
end
end

# Default handler to creates internal spans for events
class DefaultHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload)
span = @tracer.start_span(name, attributes: to_otel_semconv_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

# Handles enqueue.active_job
class EnqueueHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload)
otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} publish"
span = @tracer.start_span(span_name, kind: :producer, attributes: to_otel_semconv_attributes(payload.fetch(:job)))
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))]
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_tokens: tokens }
end
end

# Handles perform.active_job
class PerformHandler
include AttributeProcessor

def initialize(tracer)
@tracer = tracer
end

def on_start(name, _id, payload)
tokens = []
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers)
span_context = OpenTelemetry::Trace.current_span(parent_context).context

otel_config = ActiveJob::Instrumentation.instance.config
span_name = "#{otel_config[:span_naming] == :job_class ? payload.fetch(:job).class.name : payload.fetch(:job).queue_name} process"

propagation_style = otel_config[:propagation_style]
if propagation_style == :child
tokens << OpenTelemetry::Context.attach(parent_context)
span = @tracer.start_span(span_name, kind: :consumer, attributes: to_otel_semconv_attributes(payload.fetch(:job)))
else
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
span = @tracer.start_root_span(span_name, kind: :consumer, attributes: to_otel_semconv_attributes(payload.fetch(:job)), links: links)
end

tokens << OpenTelemetry::Context.attach(
OpenTelemetry::Trace.context_with_span(span)
)

{ span: span, ctx_tokens: tokens }
end
end

# Custom subscriber that handles ActiveJob notifications
class Subscriber < ::ActiveSupport::Subscriber
attr_reader :tracer

def initialize(...)
super
tracer = Instrumentation.instance.tracer
default_handler = DefaultHandler.new(tracer)
@handlers_by_pattern = {
'enqueue.active_job' => EnqueueHandler.new(tracer),
'perform.active_job' => PerformHandler.new(tracer)
}
@handlers_by_pattern.default = default_handler
end

# The methods below are the events the Subscriber is interested in.
def enqueue_at(...); end
def enqueue(...); end
def enqueue_retry(...); end
def perform_start(...); end
def perform(...); end
def retry_stopped(...); end
def discard(...); end

def start(name, id, payload)
begin
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) # The payload is _not_ transmitted over the wire
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

super
end

def finish(_name, _id, payload)
begin
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
tokens = otel&.fetch(:ctx_tokens)
exception = payload[:error]
if exception
span&.record_exception(exception)
span&.status = OpenTelemetry::Trace::Status.error
end
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end

super
ensure
begin
span&.finish
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
tokens&.reverse&.each do |token|
OpenTelemetry::Context.detach(token)
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
end
end

def self.install
attach_to :active_job
end

def self.uninstall
detach_from :active_job
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'activejob', '>= 6.0.0'
spec.add_development_dependency 'appraisal', '~> 2.5'
spec.add_development_dependency 'bundler', '~> 2.4'
spec.add_development_dependency 'debug'
spec.add_development_dependency 'minitest', '~> 5.0'
spec.add_development_dependency 'opentelemetry-sdk', '~> 1.1'
spec.add_development_dependency 'opentelemetry-test-helpers', '~> 0.3'
spec.add_development_dependency 'pry'
spec.add_development_dependency 'rake', '~> 13.0'
spec.add_development_dependency 'rubocop', '~> 1.56.1'
spec.add_development_dependency 'simplecov', '~> 0.17.1'
Expand Down
Loading

0 comments on commit 4c659cc

Please sign in to comment.