diff --git a/go.mod b/go.mod index ded53de..4bc3bf4 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,9 @@ require ( github.com/spf13/cobra v1.4.0 github.com/spf13/viper v1.11.0 github.com/stretchr/testify v1.7.1 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 + go.opentelemetry.io/otel v1.7.0 + go.opentelemetry.io/otel/trace v1.7.0 go.uber.org/zap v1.21.0 google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 @@ -21,6 +24,8 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/magiconair/properties v1.8.6 // indirect diff --git a/go.sum b/go.sum index 7f7f772..b970daf 100644 --- a/go.sum +++ b/go.sum @@ -17,12 +17,14 @@ cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHOb cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY= +cloud.google.com/go v0.100.2 h1:t9Iw5QH5v4XtlEQaCtUY7x6sCABps8sW0acw7e2WQ6Y= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= +cloud.google.com/go/compute v1.5.0 h1:b1zWmYuuHz7gO9kDcM/EpHGr06UgsYNRpNJzI2kFiLM= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -77,6 +79,11 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -121,6 +128,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -214,6 +222,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 h1:WenoaOMNP71oq3KkMZ/jnxI9xU/JSCLw8yZILSI2lfU= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0/go.mod h1:J0dBVrt7dPS/lKJyQoW0xzQiUr4r2Ik1VwPjAUWnofI= +go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM= +go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= +go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o= +go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -307,6 +321,7 @@ golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 h1:OSnWWcOd/CtWQC2cYSBgbTSJv3ciqd8r54ySIW2y3RE= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -450,6 +465,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= diff --git a/pkg/grpc/otelemetry/event.go b/pkg/grpc/otelemetry/event.go new file mode 100644 index 0000000..1e1940b --- /dev/null +++ b/pkg/grpc/otelemetry/event.go @@ -0,0 +1,38 @@ +// SPDX-FileCopyrightText: 2022-present Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +package otelemetry + +import ( + "context" + "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Event telemetry event +type event struct { + id int + message interface{} + messageType attribute.KeyValue +} + +// Send sends a telemetry event +func (e event) Send(ctx context.Context) { + span := trace.SpanFromContext(ctx) + if p, ok := e.message.(proto.Message); ok { + span.AddEvent("event", trace.WithAttributes( + e.messageType, + otelgrpc.RPCMessageIDKey.Int(e.id), + otelgrpc.RPCMessageUncompressedSizeKey.Int(proto.Size(p)), + )) + } else { + span.AddEvent("event", trace.WithAttributes( + e.messageType, + otelgrpc.RPCMessageIDKey.Int(e.id), + )) + } + +} diff --git a/pkg/grpc/otelemetry/interceptors.go b/pkg/grpc/otelemetry/interceptors.go new file mode 100644 index 0000000..f2aa21e --- /dev/null +++ b/pkg/grpc/otelemetry/interceptors.go @@ -0,0 +1,115 @@ +// SPDX-FileCopyrightText: 2022-present Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +package otelemetry + +import ( + "context" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// UnaryServerTelemetryInterceptor returns a grpc.UnaryServerInterceptor suitable +// for use in a grpc.NewServer call. +func UnaryServerTelemetryInterceptor(opts ...InstrumentationOption) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, + ) (interface{}, error) { + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + instrumentation := NewInstrumentation(opts) + bags, spanCtx := instrumentation.Extract(ctx, &metadataCopy) + ctx = baggage.ContextWithBaggage(ctx, bags) + + tracer := instrumentation.NewTracer(trace.WithInstrumentationVersion("1.0.0")) + + name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx)) + ctx, span := tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attr...), + ) + + defer span.End() + e := event{ + messageType: otelgrpc.RPCMessageTypeReceived, + message: req, + id: 1, + } + e.Send(ctx) + + resp, err := handler(ctx, req) + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + e = event{ + messageType: otelgrpc.RPCMessageTypeSent, + message: s.Proto(), + id: 1, + } + e.Send(ctx) + + } else { + span.SetAttributes(statusCodeAttr(grpccodes.OK)) + e = event{ + messageType: otelgrpc.RPCMessageTypeSent, + message: resp, + id: 1, + } + } + + return resp, err + } +} + +// StreamTelemetryServerInterceptor returns a grpc.StreamServerInterceptor suitable +// for use in a grpc.NewServer call. +func StreamTelemetryServerInterceptor(opts ...InstrumentationOption) grpc.StreamServerInterceptor { + return func( + srv interface{}, + stream grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + ctx := stream.Context() + + requestMetadata, _ := metadata.FromIncomingContext(ctx) + metadataCopy := requestMetadata.Copy() + + instrumentation := NewInstrumentation(opts) + bags, spanCtx := instrumentation.Extract(ctx, &metadataCopy) + ctx = baggage.ContextWithBaggage(ctx, bags) + + tracer := instrumentation.NewTracer(trace.WithInstrumentationVersion("1.0.0")) + + name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx)) + ctx, span := tracer.Start( + trace.ContextWithRemoteSpanContext(ctx, spanCtx), + name, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes(attr...), + ) + + defer span.End() + err := handler(srv, wrapServerStream(ctx, stream)) + + if err != nil { + s, _ := status.FromError(err) + span.SetStatus(codes.Error, s.Message()) + span.SetAttributes(statusCodeAttr(s.Code())) + } else { + span.SetAttributes(statusCodeAttr(grpccodes.OK)) + } + + return err + } +} diff --git a/pkg/grpc/otelemetry/options.go b/pkg/grpc/otelemetry/options.go new file mode 100644 index 0000000..6ce2122 --- /dev/null +++ b/pkg/grpc/otelemetry/options.go @@ -0,0 +1,118 @@ +// SPDX-FileCopyrightText: 2022-present Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +package otelemetry + +import ( + "context" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/metadata" +) + +// Instrumentation interface +type Instrumentation interface { + Extract(ctx context.Context, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) + Inject(ctx context.Context, metadata *metadata.MD) +} + +// InstrumentationOptions is a group of options for this instrumentation. +type InstrumentationOptions struct { + propagators propagation.TextMapPropagator + tracerProvider trace.TracerProvider + name string +} + +func (o *InstrumentationOptions) Inject(ctx context.Context, metadata *metadata.MD) { + o.propagators.Inject(ctx, &metadataInfo{ + metadata: metadata, + }) +} + +func (o *InstrumentationOptions) Extract(ctx context.Context, metadata *metadata.MD) (baggage.Baggage, trace.SpanContext) { + ctx = o.propagators.Extract(ctx, &metadataInfo{ + metadata: metadata, + }) + + return baggage.FromContext(ctx), trace.SpanContextFromContext(ctx) +} + +// InstrumentationOption function +type InstrumentationOption func(*InstrumentationOptions) + +// NewInstrumentation creates a configuration for an instrumentation using a set of given options +func NewInstrumentation(opts []InstrumentationOption) *InstrumentationOptions { + c := &InstrumentationOptions{ + propagators: otel.GetTextMapPropagator(), + tracerProvider: trace.NewNoopTracerProvider(), + name: "", + } + for _, option := range opts { + option(c) + } + + return c +} + +func (o InstrumentationOptions) apply(opts ...InstrumentationOption) { + for _, opt := range opts { + opt(&o) + } +} + +func (o InstrumentationOptions) NewTracer(opts ...trace.TracerOption) trace.Tracer { + return o.tracerProvider.Tracer(o.name, opts...) +} + +// WithPropagators sets propagators +func WithPropagators(propagators propagation.TextMapPropagator) InstrumentationOption { + return func(options *InstrumentationOptions) { + options.propagators = propagators + } +} + +// WithTraceProvider sets trace provider +func WithTraceProvider(tracerProvider trace.TracerProvider) InstrumentationOption { + return func(options *InstrumentationOptions) { + options.tracerProvider = tracerProvider + } +} + +// WithInstrumentationName sets instrumentation name +func WithInstrumentationName(name string) InstrumentationOption { + return func(options *InstrumentationOptions) { + options.name = name + } +} + +var _ Instrumentation = &InstrumentationOptions{} + +type metadataInfo struct { + metadata *metadata.MD +} + +// assert that metadataSupplier implements the TextMapCarrier interface +var _ propagation.TextMapCarrier = &metadataInfo{} + +func (s *metadataInfo) Get(key string) string { + values := s.metadata.Get(key) + if len(values) == 0 { + return "" + } + return values[0] +} + +func (s *metadataInfo) Set(key string, value string) { + s.metadata.Set(key, value) +} + +func (s *metadataInfo) Keys() []string { + out := make([]string, 0, len(*s.metadata)) + for key := range *s.metadata { + out = append(out, key) + } + return out +} diff --git a/pkg/grpc/otelemetry/utils.go b/pkg/grpc/otelemetry/utils.go new file mode 100644 index 0000000..cd544ee --- /dev/null +++ b/pkg/grpc/otelemetry/utils.go @@ -0,0 +1,78 @@ +// SPDX-FileCopyrightText: 2022-present Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +package otelemetry + +import ( + "context" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.10.0" + grpc_codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "net" + "strings" +) + +// spanInfo returns a span name and all appropriate attributes from the gRPC +// method and peer address. +func spanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { + attrs := []attribute.KeyValue{semconv.RPCSystemGRPC} + name, mAttrs := parseFullMethod(fullMethod) + attrs = append(attrs, mAttrs...) + attrs = append(attrs, peerAttr(peerAddress)...) + return name, attrs +} + +// peerAttr returns attributes about the peer address. +func peerAttr(addr string) []attribute.KeyValue { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return []attribute.KeyValue(nil) + } + + if host == "" { + host = "127.0.0.1" + } + + return []attribute.KeyValue{ + semconv.NetPeerIPKey.String(host), + semconv.NetPeerPortKey.String(port), + } +} + +// peerFromCtx returns a peer address from a context, if one exists. +func peerFromCtx(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "" + } + return p.Addr.String() +} + +// parseFullMethod returns a span name following the OpenTelemetry semantic +// conventions as well as all applicable span attribute.KeyValue attributes based +// on a gRPC's FullMethod. +func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) { + name := strings.TrimLeft(fullMethod, "/") + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { + // Invalid format, does not follow `/package.service/method`. + return name, []attribute.KeyValue(nil) + } + + var attrs []attribute.KeyValue + if service := parts[0]; service != "" { + attrs = append(attrs, semconv.RPCServiceKey.String(service)) + } + if method := parts[1]; method != "" { + attrs = append(attrs, semconv.RPCMethodKey.String(method)) + } + return name, attrs +} + +// statusCodeAttr returns status code attribute based on given gRPC code +func statusCodeAttr(c grpc_codes.Code) attribute.KeyValue { + return otelgrpc.GRPCStatusCodeKey.Int64(int64(c)) +} diff --git a/pkg/grpc/otelemetry/wrapstream.go b/pkg/grpc/otelemetry/wrapstream.go new file mode 100644 index 0000000..ff4a347 --- /dev/null +++ b/pkg/grpc/otelemetry/wrapstream.go @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2022-present Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 + +package otelemetry + +import ( + "context" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" +) + +type wrappedServerStream struct { + grpc.ServerStream + ctx context.Context + receivedMessageCounter int + sentMessageCounter int +} + +func (w *wrappedServerStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + + if err != nil { + return err + } + w.receivedMessageCounter++ + e := event{ + messageType: otelgrpc.RPCMessageTypeReceived, + id: w.receivedMessageCounter, + message: m, + } + e.Send(w.Context()) + + return nil +} + +func (w *wrappedServerStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + w.sentMessageCounter++ + e := event{ + messageType: otelgrpc.RPCMessageTypeSent, + message: m, + id: w.sentMessageCounter, + } + e.Send(w.Context()) + return err +} + +func wrapServerStream(ctx context.Context, stream grpc.ServerStream) *wrappedServerStream { + return &wrappedServerStream{ + ServerStream: stream, + ctx: ctx, + } +}