Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for rabbitmq/amqp091-go tracing #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ following caveats:

- **github.com/streadway/amqp**: Client and server instrumentation. *Only supported
with Go 1.7 and later.*
- **github.com/rabbitmq/amqp091-go**: Client and server instrumentation. *Only supported
with Go 1.7 and later.*

## Required Reading

Expand Down Expand Up @@ -76,7 +78,7 @@ between the producers and the consumers.
[OpenTracing project]: http://opentracing.io
[terminology]: http://opentracing.io/documentation/pages/spec.html
[OpenTracing API for Go]: https://github.com/opentracing/opentracing-go
[AMQP]: https://github.com/streadway/amqp
[AMQP]: https://github.com/rabbitmq/amqp091-go
[Build Status]: https://travis-ci.org/opentracing-contrib/go-amqp.svg
[GoDoc]: https://godoc.org/github.com/opentracing-contrib/go-amqp/amqptracer?status.svg
[check godoc]: https://godoc.org/github.com/opentracing-contrib/go-amqp/amqptracer
3 changes: 3 additions & 0 deletions amqp091tracer/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package amqptracer provides OpenTracing instrumentation for the
// github.com/rabbitmq/amqp091-go package.
package amqp091tracer
39 changes: 39 additions & 0 deletions amqp091tracer/propagation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package amqp091tracer

// amqpHeadersCarrier satisfies both TextMapWriter and TextMapReader.
//
// Example usage for server side:
//
// carrier := amqpHeadersCarrier(amqp.Table)
// clientContext, err := tracer.Extract(
// opentracing.TextMap,
// carrier)
//
// Example usage for client side:
//
// carrier := amqpHeadersCarrier(amqp.Table)
// err := tracer.Inject(
// span.Context(),
// opentracing.TextMap,
// carrier)
//
type amqpHeadersCarrier map[string]interface{}

// ForeachKey conforms to the TextMapReader interface.
func (c amqpHeadersCarrier) ForeachKey(handler func(key, val string) error) error {
for k, val := range c {
v, ok := val.(string)
if !ok {
continue
}
if err := handler(k, v); err != nil {
return err
}
}
return nil
}

// Set implements Set() of opentracing.TextMapWriter.
func (c amqpHeadersCarrier) Set(key, val string) {
c[key] = val
}
51 changes: 51 additions & 0 deletions amqp091tracer/propagation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package amqp091tracer

import (
"strconv"
"testing"

opentracing "github.com/opentracing/opentracing-go"
)

func TestAMQPHeaderInject(t *testing.T) {
h := map[string]interface{}{}
h["NotOT"] = "blah"
h["opname"] = "AlsoNotOT"
tracer := testTracer{}
span := tracer.StartSpan("someSpan")
fakeID := span.Context().(testSpanContext).FakeID

// Use amqpHeadersCarrier to wrap around `h`.
carrier := amqpHeadersCarrier(h)
if err := span.Tracer().Inject(span.Context(), opentracing.TextMap, carrier); err != nil {
t.Fatal(err)
}

if len(h) != 3 {
t.Errorf("Unexpected header length: %v", len(h))
}
// The prefix comes from just above; the suffix comes from
// testTracer.Inject().
if h["testprefix-fakeid"] != strconv.Itoa(fakeID) {
t.Errorf("Could not find fakeid at expected key")
}
}

func TestAMQPHeaderExtract(t *testing.T) {
h := map[string]interface{}{}
h["NotOT"] = "blah"
h["opname"] = "AlsoNotOT"
h["testprefix-fakeid"] = "42"
tracer := testTracer{}

// Use amqpHeadersCarrier to wrap around `h`.
carrier := amqpHeadersCarrier(h)
spanContext, err := tracer.Extract(opentracing.TextMap, carrier)
if err != nil {
t.Fatal(err)
}

if spanContext.(testSpanContext).FakeID != 42 {
t.Errorf("Failed to read testprefix-fakeid correctly")
}
}
139 changes: 139 additions & 0 deletions amqp091tracer/testtracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package amqp091tracer

import (
"strconv"
"strings"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
)

const testHTTPHeaderPrefix = "testprefix-"

// testTracer is a most-noop Tracer implementation that makes it possible for
// unittests to verify whether certain methods were / were not called.
type testTracer struct{}

var fakeIDSource = 1

func nextFakeID() int {
fakeIDSource++
return fakeIDSource
}

type testSpanContext struct {
HasParent bool
FakeID int
}

func (n testSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {}

type testSpan struct {
spanContext testSpanContext
OperationName string
StartTime time.Time
Tags map[string]interface{}
}

func (n testSpan) Equal(os opentracing.Span) bool {
other, ok := os.(testSpan)
if !ok {
return false
}
if n.spanContext != other.spanContext {
return false
}
if n.OperationName != other.OperationName {
return false
}
if !n.StartTime.Equal(other.StartTime) {
return false
}
if len(n.Tags) != len(other.Tags) {
return false
}

for k, v := range n.Tags {
if ov, ok := other.Tags[k]; !ok || ov != v {
return false
}
}

return true
}

// testSpan:
func (n testSpan) Context() opentracing.SpanContext { return n.spanContext }
func (n testSpan) SetTag(key string, value interface{}) opentracing.Span { return n }
func (n testSpan) Finish() {}
func (n testSpan) FinishWithOptions(opts opentracing.FinishOptions) {}
func (n testSpan) LogFields(fields ...log.Field) {}
func (n testSpan) LogKV(kvs ...interface{}) {}
func (n testSpan) SetOperationName(operationName string) opentracing.Span { return n }
func (n testSpan) Tracer() opentracing.Tracer { return testTracer{} }
func (n testSpan) SetBaggageItem(key, val string) opentracing.Span { return n }
func (n testSpan) BaggageItem(key string) string { return "" }
func (n testSpan) LogEvent(event string) {}
func (n testSpan) LogEventWithPayload(event string, payload interface{}) {}
func (n testSpan) Log(data opentracing.LogData) {}

// StartSpan belongs to the Tracer interface.
func (n testTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
sso := opentracing.StartSpanOptions{}
for _, o := range opts {
o.Apply(&sso)
}
return n.startSpanWithOptions(operationName, sso)
}

func (n testTracer) startSpanWithOptions(name string, opts opentracing.StartSpanOptions) opentracing.Span {
fakeID := nextFakeID()
if len(opts.References) > 0 {
fakeID = opts.References[0].ReferencedContext.(testSpanContext).FakeID
}

return testSpan{
OperationName: name,
StartTime: opts.StartTime,
Tags: opts.Tags,
spanContext: testSpanContext{
HasParent: len(opts.References) > 0,
FakeID: fakeID,
},
}
}

// Inject belongs to the Tracer interface.
func (n testTracer) Inject(sp opentracing.SpanContext, format interface{}, carrier interface{}) error {
spanContext := sp.(testSpanContext)
switch format {
case opentracing.HTTPHeaders, opentracing.TextMap:
carrier.(opentracing.TextMapWriter).Set(testHTTPHeaderPrefix+"fakeid", strconv.Itoa(spanContext.FakeID))
return nil
}
return opentracing.ErrUnsupportedFormat
}

// Extract belongs to the Tracer interface.
func (n testTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
switch format {
case opentracing.HTTPHeaders, opentracing.TextMap:
// Just for testing purposes... generally not a worthwhile thing to
// propagate.
sm := testSpanContext{}
err := carrier.(opentracing.TextMapReader).ForeachKey(func(key, val string) error {
switch strings.ToLower(key) {
case testHTTPHeaderPrefix + "fakeid":
i, err := strconv.Atoi(val)
if err != nil {
return err
}
sm.FakeID = i
}
return nil
})
return sm, err
}
return nil, opentracing.ErrSpanContextNotFound
}
57 changes: 57 additions & 0 deletions amqp091tracer/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package amqp091tracer

import (
opentracing "github.com/opentracing/opentracing-go"
amqp "github.com/rabbitmq/amqp091-go"
)

// Inject injects the span context into the AMQP header.
//
// Example:
//
// func PublishMessage(
// ctx context.Context,
// ch *amqp.Channel,
// exchange, key string,
// mandatory, immediate bool,
// msg *amqp.Publishing,
// ) error {
// sp := opentracing.SpanFromContext(ctx)
// defer sp.Finish()
//
// // Inject the span context into the AMQP header.
// if err := amqptracer.Inject(sp, msg.Headers); err != nil {
// return err
// }
//
// // Publish the message with the span context.
// return ch.Publish(exchange, key, mandatory, immediate, msg)
// }
func Inject(span opentracing.Span, hdrs amqp.Table) error {
c := amqpHeadersCarrier(hdrs)
return span.Tracer().Inject(span.Context(), opentracing.TextMap, c)
}

// Extract extracts the span context out of the AMQP header.
//
// Example:
//
// func ConsumeMessage(ctx context.Context, msg *amqp.Delivery) error {
// // Extract the span context out of the AMQP header.
// spCtx, _ := amqptracer.Extract(msg.Headers)
// sp := opentracing.StartSpan(
// "ConsumeMessage",
// opentracing.FollowsFrom(spCtx),
// )
// defer sp.Finish()
//
// // Update the context with the span for the subsequent reference.
// ctx = opentracing.ContextWithSpan(ctx, sp)
//
// // Actual message processing.
// return ProcessMessage(ctx, msg)
// }
func Extract(hdrs amqp.Table) (opentracing.SpanContext, error) {
c := amqpHeadersCarrier(hdrs)
return opentracing.GlobalTracer().Extract(opentracing.TextMap, c)
}
50 changes: 50 additions & 0 deletions amqp091tracer/tracer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package amqp091tracer

import (
"strconv"
"testing"

opentracing "github.com/opentracing/opentracing-go"
)

func TestInject(t *testing.T) {
h := map[string]interface{}{}
h["NotOT"] = "blah"
h["opname"] = "AlsoNotOT"
tracer := testTracer{}
sp := tracer.StartSpan("someSpan")
fakeID := sp.Context().(testSpanContext).FakeID

// Inject the tracing context to the AMQP header.
if err := Inject(sp, h); err != nil {
t.Fatal(err)
}

if len(h) != 3 {
t.Errorf("Unexpected header length: %v", len(h))
}
// The prefix comes from just above; the suffix comes from
// testTracer.Inject().
if h["testprefix-fakeid"] != strconv.Itoa(fakeID) {
t.Errorf("Could not find fakeid at expected key")
}
}

func TestExtract(t *testing.T) {
h := map[string]interface{}{}
h["NotOT"] = "blah"
h["opname"] = "AlsoNotOT"
h["testprefix-fakeid"] = "42"

// Set the testTracer as the global tracer.
opentracing.SetGlobalTracer(testTracer{})

// Extract the tracing span out from the AMQP header.
ctx, err := Extract(h)
if err != nil {
t.Fatal(err)
}
if ctx.(testSpanContext).FakeID != 42 {
t.Errorf("Failed to read testprefix-fakeid correctly")
}
}