diff --git a/README.md b/README.md index 1e8a79a..dea6b67 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ following caveats: - **github.com/streadway/amqp**: Client and server instrumentation. *Only supported with Go 1.7 and later.* +- **github.com/rabbitmq/amqp091-go**: Client and server instrumentation. *Only supported + with Go 1.7 and later.* ## Required Reading @@ -76,7 +78,7 @@ between the producers and the consumers. [OpenTracing project]: http://opentracing.io [terminology]: http://opentracing.io/documentation/pages/spec.html [OpenTracing API for Go]: https://github.com/opentracing/opentracing-go -[AMQP]: https://github.com/streadway/amqp +[AMQP]: https://github.com/rabbitmq/amqp091-go [Build Status]: https://travis-ci.org/opentracing-contrib/go-amqp.svg [GoDoc]: https://godoc.org/github.com/opentracing-contrib/go-amqp/amqptracer?status.svg [check godoc]: https://godoc.org/github.com/opentracing-contrib/go-amqp/amqptracer diff --git a/amqp091tracer/doc.go b/amqp091tracer/doc.go new file mode 100644 index 0000000..d07c691 --- /dev/null +++ b/amqp091tracer/doc.go @@ -0,0 +1,3 @@ +// Package amqptracer provides OpenTracing instrumentation for the +// github.com/rabbitmq/amqp091-go package. +package amqp091tracer diff --git a/amqp091tracer/propagation.go b/amqp091tracer/propagation.go new file mode 100644 index 0000000..1fe3320 --- /dev/null +++ b/amqp091tracer/propagation.go @@ -0,0 +1,39 @@ +package amqp091tracer + +// amqpHeadersCarrier satisfies both TextMapWriter and TextMapReader. +// +// Example usage for server side: +// +// carrier := amqpHeadersCarrier(amqp.Table) +// clientContext, err := tracer.Extract( +// opentracing.TextMap, +// carrier) +// +// Example usage for client side: +// +// carrier := amqpHeadersCarrier(amqp.Table) +// err := tracer.Inject( +// span.Context(), +// opentracing.TextMap, +// carrier) +// +type amqpHeadersCarrier map[string]interface{} + +// ForeachKey conforms to the TextMapReader interface. +func (c amqpHeadersCarrier) ForeachKey(handler func(key, val string) error) error { + for k, val := range c { + v, ok := val.(string) + if !ok { + continue + } + if err := handler(k, v); err != nil { + return err + } + } + return nil +} + +// Set implements Set() of opentracing.TextMapWriter. +func (c amqpHeadersCarrier) Set(key, val string) { + c[key] = val +} diff --git a/amqp091tracer/propagation_test.go b/amqp091tracer/propagation_test.go new file mode 100644 index 0000000..1ba15ac --- /dev/null +++ b/amqp091tracer/propagation_test.go @@ -0,0 +1,51 @@ +package amqp091tracer + +import ( + "strconv" + "testing" + + opentracing "github.com/opentracing/opentracing-go" +) + +func TestAMQPHeaderInject(t *testing.T) { + h := map[string]interface{}{} + h["NotOT"] = "blah" + h["opname"] = "AlsoNotOT" + tracer := testTracer{} + span := tracer.StartSpan("someSpan") + fakeID := span.Context().(testSpanContext).FakeID + + // Use amqpHeadersCarrier to wrap around `h`. + carrier := amqpHeadersCarrier(h) + if err := span.Tracer().Inject(span.Context(), opentracing.TextMap, carrier); err != nil { + t.Fatal(err) + } + + if len(h) != 3 { + t.Errorf("Unexpected header length: %v", len(h)) + } + // The prefix comes from just above; the suffix comes from + // testTracer.Inject(). + if h["testprefix-fakeid"] != strconv.Itoa(fakeID) { + t.Errorf("Could not find fakeid at expected key") + } +} + +func TestAMQPHeaderExtract(t *testing.T) { + h := map[string]interface{}{} + h["NotOT"] = "blah" + h["opname"] = "AlsoNotOT" + h["testprefix-fakeid"] = "42" + tracer := testTracer{} + + // Use amqpHeadersCarrier to wrap around `h`. + carrier := amqpHeadersCarrier(h) + spanContext, err := tracer.Extract(opentracing.TextMap, carrier) + if err != nil { + t.Fatal(err) + } + + if spanContext.(testSpanContext).FakeID != 42 { + t.Errorf("Failed to read testprefix-fakeid correctly") + } +} diff --git a/amqp091tracer/testtracer_test.go b/amqp091tracer/testtracer_test.go new file mode 100644 index 0000000..821072f --- /dev/null +++ b/amqp091tracer/testtracer_test.go @@ -0,0 +1,139 @@ +package amqp091tracer + +import ( + "strconv" + "strings" + "time" + + opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/log" +) + +const testHTTPHeaderPrefix = "testprefix-" + +// testTracer is a most-noop Tracer implementation that makes it possible for +// unittests to verify whether certain methods were / were not called. +type testTracer struct{} + +var fakeIDSource = 1 + +func nextFakeID() int { + fakeIDSource++ + return fakeIDSource +} + +type testSpanContext struct { + HasParent bool + FakeID int +} + +func (n testSpanContext) ForeachBaggageItem(handler func(k, v string) bool) {} + +type testSpan struct { + spanContext testSpanContext + OperationName string + StartTime time.Time + Tags map[string]interface{} +} + +func (n testSpan) Equal(os opentracing.Span) bool { + other, ok := os.(testSpan) + if !ok { + return false + } + if n.spanContext != other.spanContext { + return false + } + if n.OperationName != other.OperationName { + return false + } + if !n.StartTime.Equal(other.StartTime) { + return false + } + if len(n.Tags) != len(other.Tags) { + return false + } + + for k, v := range n.Tags { + if ov, ok := other.Tags[k]; !ok || ov != v { + return false + } + } + + return true +} + +// testSpan: +func (n testSpan) Context() opentracing.SpanContext { return n.spanContext } +func (n testSpan) SetTag(key string, value interface{}) opentracing.Span { return n } +func (n testSpan) Finish() {} +func (n testSpan) FinishWithOptions(opts opentracing.FinishOptions) {} +func (n testSpan) LogFields(fields ...log.Field) {} +func (n testSpan) LogKV(kvs ...interface{}) {} +func (n testSpan) SetOperationName(operationName string) opentracing.Span { return n } +func (n testSpan) Tracer() opentracing.Tracer { return testTracer{} } +func (n testSpan) SetBaggageItem(key, val string) opentracing.Span { return n } +func (n testSpan) BaggageItem(key string) string { return "" } +func (n testSpan) LogEvent(event string) {} +func (n testSpan) LogEventWithPayload(event string, payload interface{}) {} +func (n testSpan) Log(data opentracing.LogData) {} + +// StartSpan belongs to the Tracer interface. +func (n testTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { + sso := opentracing.StartSpanOptions{} + for _, o := range opts { + o.Apply(&sso) + } + return n.startSpanWithOptions(operationName, sso) +} + +func (n testTracer) startSpanWithOptions(name string, opts opentracing.StartSpanOptions) opentracing.Span { + fakeID := nextFakeID() + if len(opts.References) > 0 { + fakeID = opts.References[0].ReferencedContext.(testSpanContext).FakeID + } + + return testSpan{ + OperationName: name, + StartTime: opts.StartTime, + Tags: opts.Tags, + spanContext: testSpanContext{ + HasParent: len(opts.References) > 0, + FakeID: fakeID, + }, + } +} + +// Inject belongs to the Tracer interface. +func (n testTracer) Inject(sp opentracing.SpanContext, format interface{}, carrier interface{}) error { + spanContext := sp.(testSpanContext) + switch format { + case opentracing.HTTPHeaders, opentracing.TextMap: + carrier.(opentracing.TextMapWriter).Set(testHTTPHeaderPrefix+"fakeid", strconv.Itoa(spanContext.FakeID)) + return nil + } + return opentracing.ErrUnsupportedFormat +} + +// Extract belongs to the Tracer interface. +func (n testTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { + switch format { + case opentracing.HTTPHeaders, opentracing.TextMap: + // Just for testing purposes... generally not a worthwhile thing to + // propagate. + sm := testSpanContext{} + err := carrier.(opentracing.TextMapReader).ForeachKey(func(key, val string) error { + switch strings.ToLower(key) { + case testHTTPHeaderPrefix + "fakeid": + i, err := strconv.Atoi(val) + if err != nil { + return err + } + sm.FakeID = i + } + return nil + }) + return sm, err + } + return nil, opentracing.ErrSpanContextNotFound +} diff --git a/amqp091tracer/tracer.go b/amqp091tracer/tracer.go new file mode 100644 index 0000000..0c74398 --- /dev/null +++ b/amqp091tracer/tracer.go @@ -0,0 +1,57 @@ +package amqp091tracer + +import ( + opentracing "github.com/opentracing/opentracing-go" + amqp "github.com/rabbitmq/amqp091-go" +) + +// Inject injects the span context into the AMQP header. +// +// Example: +// +// func PublishMessage( +// ctx context.Context, +// ch *amqp.Channel, +// exchange, key string, +// mandatory, immediate bool, +// msg *amqp.Publishing, +// ) error { +// sp := opentracing.SpanFromContext(ctx) +// defer sp.Finish() +// +// // Inject the span context into the AMQP header. +// if err := amqptracer.Inject(sp, msg.Headers); err != nil { +// return err +// } +// +// // Publish the message with the span context. +// return ch.Publish(exchange, key, mandatory, immediate, msg) +// } +func Inject(span opentracing.Span, hdrs amqp.Table) error { + c := amqpHeadersCarrier(hdrs) + return span.Tracer().Inject(span.Context(), opentracing.TextMap, c) +} + +// Extract extracts the span context out of the AMQP header. +// +// Example: +// +// func ConsumeMessage(ctx context.Context, msg *amqp.Delivery) error { +// // Extract the span context out of the AMQP header. +// spCtx, _ := amqptracer.Extract(msg.Headers) +// sp := opentracing.StartSpan( +// "ConsumeMessage", +// opentracing.FollowsFrom(spCtx), +// ) +// defer sp.Finish() +// +// // Update the context with the span for the subsequent reference. +// ctx = opentracing.ContextWithSpan(ctx, sp) +// +// // Actual message processing. +// return ProcessMessage(ctx, msg) +// } +func Extract(hdrs amqp.Table) (opentracing.SpanContext, error) { + c := amqpHeadersCarrier(hdrs) + return opentracing.GlobalTracer().Extract(opentracing.TextMap, c) +} diff --git a/amqp091tracer/tracer_test.go b/amqp091tracer/tracer_test.go new file mode 100644 index 0000000..c8f5b8e --- /dev/null +++ b/amqp091tracer/tracer_test.go @@ -0,0 +1,50 @@ +package amqp091tracer + +import ( + "strconv" + "testing" + + opentracing "github.com/opentracing/opentracing-go" +) + +func TestInject(t *testing.T) { + h := map[string]interface{}{} + h["NotOT"] = "blah" + h["opname"] = "AlsoNotOT" + tracer := testTracer{} + sp := tracer.StartSpan("someSpan") + fakeID := sp.Context().(testSpanContext).FakeID + + // Inject the tracing context to the AMQP header. + if err := Inject(sp, h); err != nil { + t.Fatal(err) + } + + if len(h) != 3 { + t.Errorf("Unexpected header length: %v", len(h)) + } + // The prefix comes from just above; the suffix comes from + // testTracer.Inject(). + if h["testprefix-fakeid"] != strconv.Itoa(fakeID) { + t.Errorf("Could not find fakeid at expected key") + } +} + +func TestExtract(t *testing.T) { + h := map[string]interface{}{} + h["NotOT"] = "blah" + h["opname"] = "AlsoNotOT" + h["testprefix-fakeid"] = "42" + + // Set the testTracer as the global tracer. + opentracing.SetGlobalTracer(testTracer{}) + + // Extract the tracing span out from the AMQP header. + ctx, err := Extract(h) + if err != nil { + t.Fatal(err) + } + if ctx.(testSpanContext).FakeID != 42 { + t.Errorf("Failed to read testprefix-fakeid correctly") + } +}