Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added OTLP Attributes and testing #10

Merged
merged 9 commits into from
Mar 21, 2022
27 changes: 15 additions & 12 deletions lib/commanded/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ defmodule OpentelemetryCommanded.Aggregate do

def handle_start(_event, _, meta, _) do
context = meta.execution_context
trace_headers = decode_headers(context.metadata["trace_ctx"])
:otel_propagator_text_map.extract(trace_headers)

safe_context_propagation(context.metadata["trace_ctx"])

attributes = [
"command.type": struct_name(context.command),
"command.handler": context.handler,
"aggregate.uuid": meta.aggregate_uuid,
"aggregate.version": meta.aggregate_version,
application: meta.application,
"causation.id": context.causation_id,
"correlation.id": context.correlation_id,
"aggregate.function": context.function,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still want these two fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate.function": context.function is still there, just massaged to fit naming for messaging attributes:

      "messaging.commanded.function": context.function

I can restore context.lifespan but while testing, didn't see it's value as an attribute since it doesn't effect the processing of the current span. Thoughts?

"aggregate.lifespan": context.lifespan
"messaging.system": "commanded",
"messaging.protocol": "cqrs",
"messaging.destination_kind": "aggregate",
"messaging.operation": "receive",
"messaging.destination": context.handler,
"messaging.message_id": context.causation_id,
"messaging.conversation_id": context.correlation_id,
"messaging.commanded.aggregate_uuid": meta.aggregate_uuid,
"messaging.commanded.aggregate_version": meta.aggregate_version,
"messaging.commanded.application": meta.application,
"messaging.commanded.command": struct_name(context.command),
"messaging.commanded.function": context.function
]

OpentelemetryTelemetry.start_telemetry_span(
Expand All @@ -65,7 +68,7 @@ defmodule OpentelemetryCommanded.Aggregate do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

events = Map.get(meta, :events, [])
Span.set_attribute(ctx, :"event.count", Enum.count(events))
Span.set_attribute(ctx, :"messaging.commanded.event_count", Enum.count(events))

if error = meta[:error] do
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
Expand Down
19 changes: 12 additions & 7 deletions lib/commanded/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ defmodule OpentelemetryCommanded.Application do
def handle_start(_event, _, meta, _) do
context = meta.execution_context

safe_context_propagation(context.metadata["trace_ctx"])

attributes = [
"command.type": struct_name(context.command),
"command.handler": context.handler,
application: meta.application,
"causation.id": context.causation_id,
"correlation.id": context.correlation_id,
"aggregate.function": context.function,
"aggregate.lifespan": context.lifespan
"messaging.system": "commanded",
"messaging.protocol": "cqrs",
"messaging.destination_kind": "command_handler",
"messaging.operation": "receive",
"messaging.destination": context.handler,
"messaging.message_id": context.causation_id,
"messaging.conversation_id": context.correlation_id,
"messaging.commanded.application": meta.application,
"messaging.commanded.command": struct_name(context.command),
"messaging.commanded.function": context.function
]

OpentelemetryTelemetry.start_telemetry_span(
Expand Down
30 changes: 17 additions & 13 deletions lib/commanded/event_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,27 @@ defmodule OpentelemetryCommanded.EventHandler do
end

def handle_start(_event, _measurements, meta, _) do
event = meta.recorded_event
trace_headers = decode_headers(event.metadata["trace_ctx"])
:otel_propagator_text_map.extract(trace_headers)
recorded_event = meta.recorded_event

safe_context_propagation(recorded_event.metadata["trace_ctx"])

attributes = [
"causation.id": event.causation_id,
"correlation.id": event.correlation_id,
"event.id": event.event_id,
"event.number": event.event_number,
"event.type": event.event_type,
"stream.id": event.stream_id,
"stream.version": event.stream_version,
application: meta.application,
"messaging.system": "commanded",
"messaging.protocol": "cqrs",
"messaging.destination_kind": "event_handler",
"messaging.operation": "receive",
"messaging.message_id": recorded_event.causation_id,
"messaging.conversation_id": recorded_event.correlation_id,
"messaging.destination": meta.handler_module,
"messaging.commanded.application": meta.application,
"messaging.commanded.event": recorded_event.event_type,
"messaging.commanded.event_id": recorded_event.event_id,
"messaging.commanded.event_number": recorded_event.event_number,
"messaging.commanded.stream_id": recorded_event.stream_id,
"messaging.commanded.stream_version": recorded_event.stream_version,
"messaging.commanded.handler_name": meta.handler_name
# TODO add back
# consistency: meta.consistency,
"handler.module": meta.handler_module,
"handler.name": meta.handler_name
# TODO add this back into commanded
# "event.last_seen": meta.last_seen_event
]
Expand Down
129 changes: 96 additions & 33 deletions lib/commanded/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,111 @@ defmodule OpentelemetryCommanded.EventStore do

require OpenTelemetry.Tracer

alias OpenTelemetry.{Tracer, Span}
import OpentelemetryCommanded.Util

alias OpenTelemetry.Span

@tracer_id __MODULE__

def setup do
:telemetry.attach_many(
{__MODULE__, :stop},
[
[:commanded, :event_store, :stream_forward, :stop],
[:commanded, :event_store, :append_to_stream, :stop]
],
&__MODULE__.handle_stop/4,
[]
)
~w(
ack_event
adapter
append_to_stream
delete_snapshot
delete_subscription
read_snapshot
record_snapshot
stream_forward
stream_forward
stream_forward
subscribe
subscribe_to
subscribe_to
unsubscribe
)a
|> Enum.each(fn event ->
:telemetry.attach(
{__MODULE__, :start},
[:commanded, :event_store, event, :start],
&__MODULE__.handle_start/4,
[]
)

:telemetry.attach_many(
{__MODULE__, :exception},
[
[:commanded, :event_store, :stream_forward, :exception],
[:commanded, :event_store, :append_to_stream, :exception]
],
&__MODULE__.handle_stop/4,
[]
)
:telemetry.attach(
{__MODULE__, :stop},
[:commanded, :event_store, event, :stop],
&__MODULE__.handle_stop/4,
[]
)

:telemetry.attach(
{__MODULE__, :exception},
[:commanded, :event_store, event, :exception],
&__MODULE__.handle_exception/4,
[]
)
end)
end

def handle_stop([_, _, action, type], measurements, meta, _) do
end_time = :opentelemetry.timestamp()
start_time = end_time - measurements.duration
attributes = meta |> Map.take([:application, :stream_uuid]) |> Enum.to_list()
span_name = :"commanded.event_store.#{action}"
def handle_start([_, _, action, _type], _measurements, meta, _) do
event = meta.event

safe_context_propagation(event.metadata["trace_ctx"])

attributes = [
"messaging.system": "commanded",
"messaging.protocol": "cqrs",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cqrs isn't a transport protocol, it's a pattern. This attribute should probably not be included.

"messaging.destination_kind": "event_store",
# TODO does this need to change based on the event? Or maybe `internal`?
"messaging.operation": "receive",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryannaegele
For spans created by a messaging system all have operation types from that pool (send, receive, subscribe)?

This doesn't seem to fit well. It seems an "internal" span w/i the CQRS messaging system.

Thoughts?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know where subscribe comes from. Messaging operations are send, receive, process. Span type of internal is only appropriate where a function is calling another function. If there's a message or operation crossing a system boundary, e.g. to the database, then it's not internal.

CQRS is a bit of a hybrid between Messaging and RPC, so I'm not sure of all the specifics of how it should be treated. You can always solicit opinions in the #opentelemetry channel of the CNCF slack.

"messaging.message_id": event.causation_id,
"messaging.conversation_id": event.correlation_id,
# "messaging.destination": meta.handler_module,
"messaging.commanded.application": meta.application,
"messaging.commanded.event": event.event_type,
"messaging.commanded.event_id": event.event_id,
"messaging.commanded.event_number": event.event_number,
"messaging.commanded.stream_id": event.stream_id,
"messaging.commanded.stream_version": event.stream_version
]

Tracer.start_span(span_name, %{start_time: start_time, attributes: attributes})
OpentelemetryTelemetry.start_telemetry_span(
@tracer_id,
"commanded.event_store.#{action}",
meta,
%{
kind: :consumer,
attributes: attributes
}
)
end

if type == :exception do
ctx = Tracer.current_span_ctx()
reason = meta[:reason]
stacktrace = meta[:stacktrace]
def handle_stop(_event, _measurements, meta, _) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

exception = Exception.normalize(meta[:kind], reason, stacktrace)
Span.record_exception(ctx, exception, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
if error = meta[:error] do
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
end

Tracer.end_span()
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
end

def handle_exception(
_event,
_measurements,
%{kind: kind, reason: reason, stacktrace: stacktrace} = meta,
_config
) do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

# try to normalize all errors to Elixir exceptions
exception = Exception.normalize(kind, reason, stacktrace)

# record exception and mark the span as errored
Span.record_exception(ctx, exception, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason)))

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
end
end
43 changes: 27 additions & 16 deletions lib/commanded/process_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@ defmodule OpentelemetryCommanded.ProcessManager do
end

def handle_start(_event, _, meta, _) do
event = meta.recorded_event
trace_headers = decode_headers(event.metadata["trace_ctx"])
:otel_propagator_text_map.extract(trace_headers)
recorded_event = meta.recorded_event
safe_context_propagation(recorded_event.metadata["trace_ctx"])

attributes = [
application: meta.application,
"process_manager.uuid": meta.process_uuid,
"process_manager.name": meta.process_manager_name,
"process_manager.module": meta.process_manager_module,
"event.id": event.event_id,
"event.number": event.event_number,
"event.type": event.event_type,
"correlation.id": event.correlation_id,
"causation.id": event.causation_id,
"stream.id": event.stream_id,
"stream.version": event.stream_version
"messaging.system": "commanded",
"messaging.protocol": "cqrs",
"messaging.destination_kind": "process_manager",
"messaging.operation": "receive",
"messaging.message_id": recorded_event.causation_id,
"messaging.conversation_id": recorded_event.correlation_id,
"messaging.destination": meta.process_manager_module,
"messaging.commanded.application": meta.application,
"messaging.commanded.event": recorded_event.event_type,
"messaging.commanded.event_id": recorded_event.event_id,
"messaging.commanded.event_number": recorded_event.event_number,
"messaging.commanded.process_uuid": meta.process_uuid,
"messaging.commanded.stream_id": recorded_event.stream_id,
"messaging.commanded.stream_version": recorded_event.stream_version,
"messaging.commanded.handler_name": meta.process_manager_name
# TODO add back
# consistency: meta.consistency,
# TODO add this back into commanded
# "event.last_seen": meta.last_seen_event
Comment on lines +56 to +59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left @davydog187 's comments.

Copy link
Contributor Author

@leggebroten leggebroten Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryannaegele
After reading a ton, I think that I should remove the messaging. prefix from the Commanded attributes.
This would also allow easier query building too I would think
Thoughts?

e.g.

      "messaging.commanded.application": meta.application,
      "messaging.commanded.event": recorded_event.event_type,
      "messaging.commanded.event_id": recorded_event.event_id,
      "messaging.commanded.event_number": recorded_event.event_number,
      "messaging.commanded.process_uuid": meta.process_uuid,
      "messaging.commanded.stream_id": recorded_event.stream_id,
      "messaging.commanded.stream_version": recorded_event.stream_version,
      "messaging.commanded.handler_name": meta.process_manager_name

should be

      "commanded.application": meta.application,
      "commanded.event": recorded_event.event_type,
      "commanded.event_id": recorded_event.event_id,
      "commanded.event_number": recorded_event.event_number,
      "commanded.process_uuid": meta.process_uuid,
      "commanded.stream_id": recorded_event.stream_id,
      "commanded.stream_version": recorded_event.stream_version,
      "commanded.handler_name": meta.process_manager_name

]

OpentelemetryTelemetry.start_telemetry_span(
Expand All @@ -67,7 +74,11 @@ defmodule OpentelemetryCommanded.ProcessManager do
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

commands = Map.get(meta, :commands, [])
Span.set_attribute(ctx, :"command.count", Enum.count(commands))
Span.set_attribute(ctx, :"messaging.commanded.command_count", Enum.count(commands))

if error = meta[:error] do
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
end

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
end
Expand All @@ -85,7 +96,7 @@ defmodule OpentelemetryCommanded.ProcessManager do

# record exception and mark the span as errored
Span.record_exception(ctx, exception, stacktrace)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason)))

OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
end
Expand Down
10 changes: 10 additions & 0 deletions lib/commanded/util.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
defmodule OpentelemetryCommanded.Util do
@moduledoc false

def safe_context_propagation(trace_ctx) when is_nil(trace_ctx) do
nil
end

def safe_context_propagation(trace_ctx) do
trace_ctx
|> decode_headers()
|> :otel_propagator_text_map.extract()
end

def encode_headers(headers), do: Enum.map(headers, &Tuple.to_list/1)

def decode_headers(headers), do: Enum.map(headers, &List.to_tuple/1)
Expand Down
17 changes: 16 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule OpentelemetryCommanded.MixProject do
app: :opentelemetry_commanded,
version: "0.2.0",
elixir: "~> 1.10",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
package: package(),
deps: deps(),
Expand All @@ -15,6 +16,15 @@ defmodule OpentelemetryCommanded.MixProject do
]
end

defp elixirc_paths(env) when env in [:test],
do: [
"lib",
"test/support",
"test/dummy_app"
]

defp elixirc_paths(_env), do: ["lib"]

# Run "mix help compile.app" to learn about applications.
def application do
[]
Expand All @@ -29,10 +39,15 @@ defmodule OpentelemetryCommanded.MixProject do

defp deps do
[
{:commanded, "~> 1.3.1"},
{:commanded,
github: "commanded/commanded",
ref: "75b19cb3a994aa36984b63bd9b5bffab4d6f8310"},
# {:commanded, "~> 1.3.1"},
{:opentelemetry_telemetry, "~> 1.0.0-beta.7"},
{:telemetry, "~> 1.0"},
{:opentelemetry, "~> 1.0"},
{:jason, "~> 1.2", only: :test},
{:ecto, "~> 3.7.1", only: :test},
{:ex_doc, "~> 0.23.0", only: [:dev], runtime: false}
]
end
Expand Down
5 changes: 4 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
%{
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"},
"commanded": {:hex, :commanded, "1.3.1", "d18a73bface68c04cbbda69647604a3cc1918fbdf8af4a784fc3a3a30ca34a13", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "9bd03ef6fc05e3a8fb4d0808f13a2106688e60ee4b2bdb78cf7e63a6788c9faf"},
"commanded": {:git, "https://github.com/commanded/commanded.git", "75b19cb3a994aa36984b63bd9b5bffab4d6f8310", [ref: "75b19cb3a994aa36984b63bd9b5bffab4d6f8310"]},
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
"ecto": {:hex, :ecto, "3.7.1", "a20598862351b29f80f285b21ec5297da1181c0442687f9b8329f0445d228892", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d36e5b39fc479e654cffd4dbe1865d9716e4a9b6311faff799b6f90ab81b8638"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
Expand Down
Loading