Skip to content

Commit

Permalink
Add open telemetry interceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
adibrastegarnia committed May 18, 2022
1 parent a0a578c commit 044cf1e
Show file tree
Hide file tree
Showing 7 changed files with 424 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.11.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
Expand All @@ -21,6 +24,8 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
Expand Down
16 changes: 16 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHOb
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk=
cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY=
cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/compute v1.5.0 h1:b1zWmYuuHz7gO9kDcM/EpHGr06UgsYNRpNJzI2kFiLM=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
Expand Down Expand Up @@ -77,6 +79,11 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
Expand Down Expand Up @@ -121,6 +128,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down Expand Up @@ -214,6 +222,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 h1:WenoaOMNP71oq3KkMZ/jnxI9xU/JSCLw8yZILSI2lfU=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -307,6 +321,7 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -450,6 +465,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
Expand Down
38 changes: 38 additions & 0 deletions pkg/grpc/otelemetry/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// SPDX-FileCopyrightText: 2022-present Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

package otelemetry

import (
"context"
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// Event telemetry event
type event struct {
id int
message interface{}
messageType attribute.KeyValue
}

// Send sends a telemetry event
func (e event) Send(ctx context.Context) {
span := trace.SpanFromContext(ctx)
if p, ok := e.message.(proto.Message); ok {
span.AddEvent("event", trace.WithAttributes(
e.messageType,
otelgrpc.RPCMessageIDKey.Int(e.id),
otelgrpc.RPCMessageUncompressedSizeKey.Int(proto.Size(p)),
))
} else {
span.AddEvent("event", trace.WithAttributes(
e.messageType,
otelgrpc.RPCMessageIDKey.Int(e.id),
))
}

}
115 changes: 115 additions & 0 deletions pkg/grpc/otelemetry/interceptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// SPDX-FileCopyrightText: 2022-present Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

package otelemetry

import (
"context"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

// UnaryServerTelemetryInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
func UnaryServerTelemetryInterceptor(opts ...InstrumentationOption) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

instrumentation := NewInstrumentation(opts)
bags, spanCtx := instrumentation.Extract(ctx, &metadataCopy)
ctx = baggage.ContextWithBaggage(ctx, bags)

tracer := instrumentation.NewTracer(trace.WithInstrumentationVersion("1.0.0"))

name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)

defer span.End()
e := event{
messageType: otelgrpc.RPCMessageTypeReceived,
message: req,
id: 1,
}
e.Send(ctx)

resp, err := handler(ctx, req)
if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
e = event{
messageType: otelgrpc.RPCMessageTypeSent,
message: s.Proto(),
id: 1,
}
e.Send(ctx)

} else {
span.SetAttributes(statusCodeAttr(grpccodes.OK))
e = event{
messageType: otelgrpc.RPCMessageTypeSent,
message: resp,
id: 1,
}
}

return resp, err
}
}

// StreamTelemetryServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
func StreamTelemetryServerInterceptor(opts ...InstrumentationOption) grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := stream.Context()

requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

instrumentation := NewInstrumentation(opts)
bags, spanCtx := instrumentation.Extract(ctx, &metadataCopy)
ctx = baggage.ContextWithBaggage(ctx, bags)

tracer := instrumentation.NewTracer(trace.WithInstrumentationVersion("1.0.0"))

name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
)

defer span.End()
err := handler(srv, wrapServerStream(ctx, stream))

if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpccodes.OK))
}

return err
}
}
118 changes: 118 additions & 0 deletions pkg/grpc/otelemetry/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// SPDX-FileCopyrightText: 2022-present Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0

package otelemetry

import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
)

// Instrumentation interface
type Instrumentation interface {
Extract(ctx context.Context, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext)
Inject(ctx context.Context, metadata *metadata.MD)
}

// InstrumentationOptions is a group of options for this instrumentation.
type InstrumentationOptions struct {
propagators propagation.TextMapPropagator
tracerProvider trace.TracerProvider
name string
}

func (o *InstrumentationOptions) Inject(ctx context.Context, metadata *metadata.MD) {
o.propagators.Inject(ctx, &metadataInfo{
metadata: metadata,
})
}

func (o *InstrumentationOptions) Extract(ctx context.Context, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) {
ctx = o.propagators.Extract(ctx, &metadataInfo{
metadata: metadata,
})

return baggage.FromContext(ctx), trace.SpanContextFromContext(ctx)
}

// InstrumentationOption function
type InstrumentationOption func(*InstrumentationOptions)

// NewInstrumentation creates a configuration for an instrumentation using a set of given options
func NewInstrumentation(opts []InstrumentationOption) *InstrumentationOptions {
c := &InstrumentationOptions{
propagators: otel.GetTextMapPropagator(),
tracerProvider: trace.NewNoopTracerProvider(),
name: "",
}
for _, option := range opts {
option(c)
}

return c
}

func (o InstrumentationOptions) apply(opts ...InstrumentationOption) {
for _, opt := range opts {
opt(&o)
}
}

func (o InstrumentationOptions) NewTracer(opts ...trace.TracerOption) trace.Tracer {
return o.tracerProvider.Tracer(o.name, opts...)
}

// WithPropagators sets propagators
func WithPropagators(propagators propagation.TextMapPropagator) InstrumentationOption {
return func(options *InstrumentationOptions) {
options.propagators = propagators
}
}

// WithTraceProvider sets trace provider
func WithTraceProvider(tracerProvider trace.TracerProvider) InstrumentationOption {
return func(options *InstrumentationOptions) {
options.tracerProvider = tracerProvider
}
}

// WithInstrumentationName sets instrumentation name
func WithInstrumentationName(name string) InstrumentationOption {
return func(options *InstrumentationOptions) {
options.name = name
}
}

var _ Instrumentation = &InstrumentationOptions{}

type metadataInfo struct {
metadata *metadata.MD
}

// assert that metadataSupplier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = &metadataInfo{}

func (s *metadataInfo) Get(key string) string {
values := s.metadata.Get(key)
if len(values) == 0 {
return ""
}
return values[0]
}

func (s *metadataInfo) Set(key string, value string) {
s.metadata.Set(key, value)
}

func (s *metadataInfo) Keys() []string {
out := make([]string, 0, len(*s.metadata))
for key := range *s.metadata {
out = append(out, key)
}
return out
}
Loading

0 comments on commit 044cf1e

Please sign in to comment.