From 4e84241dfd58419b43603ceaf75313087f8b7728 Mon Sep 17 00:00:00 2001 From: Kamal Al Marhubi Date: Sun, 23 Apr 2023 21:05:17 -0400 Subject: [PATCH] receiver/googlecloudpubsub: Add support for Cloud Logging messages Add a new `cloud_logging` encoding, which allows the receiver to consume messages from a Pub/Sub topic that's used as the destination of a Cloud Logging sink. refs #23184 --- receiver/googlecloudpubsubreceiver/README.md | 16 +- receiver/googlecloudpubsubreceiver/config.go | 3 +- receiver/googlecloudpubsubreceiver/go.mod | 15 +- receiver/googlecloudpubsubreceiver/go.sum | 6 + .../internal/common_protos.go | 19 + .../internal/log_entry.go | 598 ++++++++++++++++++ .../internal/log_entry_test.go | 118 ++++ .../googlecloudpubsubreceiver/receiver.go | 42 ++ 8 files changed, 806 insertions(+), 11 deletions(-) create mode 100644 receiver/googlecloudpubsubreceiver/internal/common_protos.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/log_entry.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/log_entry_test.go diff --git a/receiver/googlecloudpubsubreceiver/README.md b/receiver/googlecloudpubsubreceiver/README.md index 83cb6d0da907..4fa63d11e6a2 100644 --- a/receiver/googlecloudpubsubreceiver/README.md +++ b/receiver/googlecloudpubsubreceiver/README.md @@ -23,8 +23,8 @@ The following configuration options are supported: * `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`). * `encoding` (Optional): The encoding that will be used to received data from the subscription. This can either be - `otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`). This will only be used as - a fallback, when no `content-type` attribute is present. + `otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, `cloud_logging`, or `raw_text` (see `encoding`). This will + only be used as a fallback, when no `content-type` attribute is present. * `compression` (Optional): The compression that will be used on received data from the subscription. When set it can only be `gzip`. This will only be used as a fallback, when no `content-encoding` attribute is present. * `endpoint` (Optional): Override the default Pubsub Endpoint, useful when connecting to the PubSub emulator instance @@ -54,12 +54,20 @@ must the `encoding` field in the configuration be set. | - | - | otlp_proto_trace | Decode OTLP trace message | | - | - | otlp_proto_metric | Decode OTLP trace message | | - | - | otlp_proto_log | Decode OTLP trace message | +| - | - | cloud_logging | Decode [Cloud Logging]'s [LogEntry] message type | - | - | raw_text | Wrap in an OTLP log message | When the `encoding` configuration is set, the attributes on the message are ignored. -The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log -message, making it a convenient way to ingest log lines from Pubsub. +With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll +first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. + +With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them +in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub. + +[Cloud Logging]: https://cloud.google.com/logging +[LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry +[sink-docs]: https://cloud.google.com/logging/docs/export/configure_export_v2#creating_sink ## Pubsub subscription diff --git a/receiver/googlecloudpubsubreceiver/config.go b/receiver/googlecloudpubsubreceiver/config.go index 82cf760119e4..83ad930a29dc 100644 --- a/receiver/googlecloudpubsubreceiver/config.go +++ b/receiver/googlecloudpubsubreceiver/config.go @@ -46,8 +46,9 @@ func (config *Config) validateForLog() error { case "otlp_proto_log": case "raw_text": case "raw_json": + case "cloud_logging": default: - return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json]", config.Encoding) + return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding) } return nil } diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index 8d4e71fbf226..2813595e93ed 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -3,7 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/google go 1.20 require ( + cloud.google.com/go/logging v1.7.0 cloud.google.com/go/pubsub v1.33.0 + github.com/google/go-cmp v0.5.9 + github.com/iancoleman/strcase v0.2.0 + github.com/json-iterator/go v1.1.12 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.86.1-0.20231006161201-d364ad61c4d7 go.opentelemetry.io/collector/confmap v0.86.1-0.20231006161201-d364ad61c4d7 @@ -11,9 +15,13 @@ require ( go.opentelemetry.io/collector/exporter v0.86.1-0.20231006161201-d364ad61c4d7 go.opentelemetry.io/collector/pdata v1.0.0-rcv0015.0.20231006161201-d364ad61c4d7 go.opentelemetry.io/collector/receiver v0.86.1-0.20231006161201-d364ad61c4d7 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 google.golang.org/api v0.143.0 + google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb + google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb google.golang.org/grpc v1.58.2 + google.golang.org/protobuf v1.31.0 ) require ( @@ -21,16 +29,15 @@ require ( cloud.google.com/go/compute v1.23.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.1 // indirect + cloud.google.com/go/longrunning v0.5.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.1 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/knadh/koanf/v2 v2.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect @@ -47,7 +54,6 @@ require ( go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.13.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect @@ -55,10 +61,7 @@ require ( golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect - google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index 4534937f7cca..05ac3755406b 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -8,6 +8,10 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= +cloud.google.com/go/logging v1.7.0 h1:CJYxlNNNNAMkHp9em/YEXcfJg+rPDg7YfwoRpMU+t5I= +cloud.google.com/go/logging v1.7.0/go.mod h1:3xjP2CjkM3ZkO73aj4ASA5wRPGGCRrPIAeNqVNkzY8M= +cloud.google.com/go/longrunning v0.5.1 h1:Fr7TXftcqTudoyRJa113hyaqlGdiBQkp0Gq7tErFDWI= +cloud.google.com/go/longrunning v0.5.1/go.mod h1:spvimkwdz6SPWKEt/XBij79E9fiTkHSQl/fRUUQJYJc= cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g= cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc= contrib.go.opencensus.io/exporter/prometheus v0.4.2 h1:sqfsYl5GIY/L570iT+l93ehxaWJs2/OwXtiWwew3oAg= @@ -158,6 +162,8 @@ github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvh github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= +github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= +github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= diff --git a/receiver/googlecloudpubsubreceiver/internal/common_protos.go b/receiver/googlecloudpubsubreceiver/internal/common_protos.go new file mode 100644 index 000000000000..6d7656b19a56 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/common_protos.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + +import ( + _ "google.golang.org/genproto/googleapis/cloud/audit" +) diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry.go b/receiver/googlecloudpubsubreceiver/internal/log_entry.go new file mode 100644 index 000000000000..243877e68230 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry.go @@ -0,0 +1,598 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + +import ( + "bytes" + "context" + "encoding/hex" + stdjson "encoding/json" + jsoniter "github.com/json-iterator/go" + + "errors" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/iancoleman/strcase" + "go.uber.org/zap" + + "cloud.google.com/go/logging/apiv2/loggingpb" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" +) + + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +var invalidTraceID = [16]byte{} +var invalidSpanID = [8]byte{} + +func cloudLoggingTraceToTraceIDBytes(trace string) [16]byte { + // Format: projects/my-gcp-project/traces/4ebc71f1def9274798cac4e8960d0095 + lastSlashIdx := strings.LastIndex(trace, "/") + if lastSlashIdx == -1 { + return invalidTraceID + } + traceIDStr := trace[lastSlashIdx+1:] + + return traceIDStrTotraceIDBytes(traceIDStr) +} + +func traceIDStrTotraceIDBytes(traceIDStr string) [16]byte { + traceIDSlice := [16]byte{} + decoded, err := hex.Decode(traceIDSlice[:], []byte(traceIDStr)) + if err != nil || decoded != 16 { + return invalidTraceID + } + + return traceIDSlice +} + +func spanIDStrToSpanIDBytes(spanIDStr string) [8]byte { + spanIDSlice := [8]byte{} + decoded, err := hex.Decode(spanIDSlice[:], []byte(spanIDStr)) + if err != nil || decoded != 8 { + return invalidSpanID + } + + return spanIDSlice +} + +var desc protoreflect.MessageDescriptor +var descOnce sync.Once + +func getLogEntryDescriptor() protoreflect.MessageDescriptor { + descOnce.Do(func() { + var logEntry loggingpb.LogEntry + + desc = logEntry.ProtoReflect().Descriptor() + }) + + return desc +} + +// TranslateLogEntry translates a JSON-encoded LogEntry message into a pair of +// pcommon.Resource and plog.LogRecord, trying to keep as close as possible to +// the semantic conventions. +// +// For maximum fidelity, the decoding is done according to the protobuf message +// schema; this ensures that a numeric value in the input is correctly +// translated to either an integer or a double in the output. It falls back to +// plain JSON decoding if payload type is not available in the proto registry. +func TranslateLogEntry(ctx context.Context, logger *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) { + lr := plog.NewLogRecord() + res := pcommon.NewResource() + + var src map[string]stdjson.RawMessage + err := json.Unmarshal(data, &src) + + if err != nil { + return res, lr, err + } + + resAttrs := res.Attributes() + attrs := lr.Attributes() + + for k, v := range src { + // Pick out some keys for special handling, and let the rest + // pass through to be translated according to the schema. + switch k { + // Unpack as suggested in the logs data model appendix + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model-appendix.md#google-cloud-logging + case "timestamp": + // timestamp -> Timestamp + var t time.Time + err = json.Unmarshal(v, &t) + if err != nil { + return res, lr, err + } + lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) + delete(src, k) + case "resource": + // resource -> Resource + // mapping type -> gcp.resource_type + // labels -> gcp.