From e79241e82d76c2fb1aea89dd678564c268c9fdd3 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Mon, 16 Dec 2024 14:33:59 -0500 Subject: [PATCH] chore: separate http and grpc exporters --- exporter/chronicleexporter/exporter.go | 283 ------------------ exporter/chronicleexporter/exporter_test.go | 185 ------------ exporter/chronicleexporter/factory.go | 36 ++- exporter/chronicleexporter/grpc_exporter.go | 160 ++++++++++ exporter/chronicleexporter/http_exporter.go | 161 ++++++++++ .../chronicleexporter/http_exporter_test.go | 4 +- exporter/chronicleexporter/marshal.go | 20 +- exporter/chronicleexporter/marshal_test.go | 10 +- 8 files changed, 354 insertions(+), 505 deletions(-) delete mode 100644 exporter/chronicleexporter/exporter.go delete mode 100644 exporter/chronicleexporter/exporter_test.go create mode 100644 exporter/chronicleexporter/grpc_exporter.go create mode 100644 exporter/chronicleexporter/http_exporter.go diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go deleted file mode 100644 index 0dbf31987..000000000 --- a/exporter/chronicleexporter/exporter.go +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright observIQ, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chronicleexporter - -import ( - "bytes" - "compress/gzip" - "context" - "fmt" - "io" - "net/http" - - "github.com/google/uuid" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" - "golang.org/x/oauth2" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/oauth" - grpcgzip "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/encoding/protojson" -) - -const ( - grpcScope = "https://www.googleapis.com/auth/malachite-ingestion" - httpScope = "https://www.googleapis.com/auth/cloud-platform" -) - -type chronicleExporter struct { - cfg *Config - set component.TelemetrySettings - marshaler logMarshaler - exporterID string - - // fields used for gRPC - grpcClient api.IngestionServiceV2Client - grpcConn *grpc.ClientConn - metrics *hostMetricsReporter - - // fields used for HTTP - httpClient *http.Client -} - -func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chronicleExporter, error) { - customerID, err := uuid.Parse(cfg.CustomerID) - if err != nil { - return nil, fmt.Errorf("parse customer ID: %w", err) - } - - marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:]) - if err != nil { - return nil, fmt.Errorf("create proto marshaller: %w", err) - } - - return &chronicleExporter{ - cfg: cfg, - set: params.TelemetrySettings, - marshaler: marshaller, - exporterID: exporterID, - }, nil -} - -func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error { - ts, err := tokenSource(ctx, ce.cfg) - if err != nil { - return fmt.Errorf("load Google credentials: %w", err) - } - - if ce.cfg.Protocol == protocolHTTPS { - ce.httpClient = oauth2.NewClient(context.Background(), ts) - return nil - } - - endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts) - conn, err := grpc.NewClient(endpoint, dialOpts...) - if err != nil { - return fmt.Errorf("dial: %w", err) - } - ce.grpcConn = conn - ce.grpcClient = api.NewIngestionServiceV2Client(conn) - - if ce.cfg.CollectAgentMetrics { - f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error { - _, err := ce.grpcClient.BatchCreateEvents(ctx, request) - return err - } - metrics, err := newHostMetricsReporter(ce.cfg, ce.set, ce.exporterID, f) - if err != nil { - return fmt.Errorf("create metrics reporter: %w", err) - } - ce.metrics = metrics - ce.metrics.start() - } - - return nil -} - -func (ce *chronicleExporter) Shutdown(context.Context) error { - defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() - if ce.cfg.Protocol == protocolHTTPS { - t := ce.httpClient.Transport.(*oauth2.Transport) - if t.Base != nil { - t.Base.(*http.Transport).CloseIdleConnections() - } - return nil - } - if ce.metrics != nil { - ce.metrics.shutdown() - } - if ce.grpcConn != nil { - if err := ce.grpcConn.Close(); err != nil { - return fmt.Errorf("connection close: %s", err) - } - } - return nil -} - -func (ce *chronicleExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error { - payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld) - if err != nil { - return fmt.Errorf("marshal logs: %w", err) - } - - for _, payload := range payloads { - if err := ce.uploadToChronicle(ctx, payload); err != nil { - return err - } - } - - return nil -} - -func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { - if ce.metrics != nil { - totalLogs := int64(len(request.GetBatch().GetEntries())) - defer ce.metrics.recordSent(totalLogs) - } - - _, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...) - if err != nil { - errCode := status.Code(err) - switch errCode { - // These errors are potentially transient - case codes.Canceled, - codes.Unavailable, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted: - return fmt.Errorf("upload logs to chronicle: %w", err) - default: - return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err)) - } - } - - return nil -} - -func (ce *chronicleExporter) buildOptions() []grpc.CallOption { - opts := make([]grpc.CallOption, 0) - - if ce.cfg.Compression == grpcgzip.Name { - opts = append(opts, grpc.UseCompressor(grpcgzip.Name)) - } - - return opts -} - -func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error { - payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld) - if err != nil { - return fmt.Errorf("marshal logs: %w", err) - } - - for logType, logTypePayloads := range payloads { - for _, payload := range logTypePayloads { - if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { - return err - } - } - } - - return nil -} - -func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { - - data, err := protojson.Marshal(logs) - if err != nil { - return fmt.Errorf("marshal protobuf logs to JSON: %w", err) - } - - var body io.Reader - - if ce.cfg.Compression == grpcgzip.Name { - var b bytes.Buffer - gz := gzip.NewWriter(&b) - if _, err := gz.Write(data); err != nil { - return fmt.Errorf("gzip write: %w", err) - } - if err := gz.Close(); err != nil { - return fmt.Errorf("gzip close: %w", err) - } - body = &b - } else { - body = bytes.NewBuffer(data) - } - - request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body) - if err != nil { - return fmt.Errorf("create request: %w", err) - } - - if ce.cfg.Compression == grpcgzip.Name { - request.Header.Set("Content-Encoding", "gzip") - } - - request.Header.Set("Content-Type", "application/json") - - resp, err := ce.httpClient.Do(request) - if err != nil { - return fmt.Errorf("send request to Chronicle: %w", err) - } - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) - if err == nil && resp.StatusCode == http.StatusOK { - return nil - } - - if err != nil { - ce.set.Logger.Warn("Failed to read response body", zap.Error(err)) - } else { - ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) - } - - // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go - statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status) - switch resp.StatusCode { - case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient - return statusErr - default: - return consumererror.NewPermanent(statusErr) - } -} - -// Override for testing -var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) { - return cfgEndpoint + ":443", []grpc.DialOption{ - grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}), - grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), - } -} - -// This uses the DataPlane URL for the request -// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID} -// Override for testing -var httpEndpoint = func(cfg *Config, logType string) string { - formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" - return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType) -} diff --git a/exporter/chronicleexporter/exporter_test.go b/exporter/chronicleexporter/exporter_test.go deleted file mode 100644 index 25e680872..000000000 --- a/exporter/chronicleexporter/exporter_test.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright observIQ, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chronicleexporter - -import ( - "context" - "errors" - "testing" - - "github.com/golang/mock/gomock" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api/mocks" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer/consumererror" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -func TestLogsDataPusher(t *testing.T) { - - // Set up configuration, logger, and context - cfg := Config{Endpoint: defaultEndpoint} - ctx := context.Background() - - testCases := []struct { - desc string - setupExporter func() *chronicleExporter - setupMocks func(*mocks.MockIngestionServiceV2Client) - expectedErr string - permanentErr bool - }{ - { - desc: "successful push to Chronicle", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(&api.BatchCreateLogsResponse{}, nil) - }, - expectedErr: "", - }, - { - desc: "successful push to Chronicle with metrics", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - cfg.CollectAgentMetrics = true - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(&api.BatchCreateLogsResponse{}, nil) - }, - expectedErr: "", - }, - { - desc: "upload to Chronicle fails (transient)", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - // Simulate an error returned from the Chronicle service - err := status.Error(codes.Unavailable, "service unavailable") - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) - }, - expectedErr: "service unavailable", - }, - { - desc: "upload to Chronicle fails (permanent)", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - err := status.Error(codes.InvalidArgument, "Invalid argument detected.") - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) - }, - expectedErr: "Invalid argument detected.", - permanentErr: true, - }, - { - desc: "marshaler error", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - // Simulate an error during log marshaling - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error")) - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { - // No need to setup mocks for the client as the error occurs before the client is used - }, - expectedErr: "marshal error", - }, - { - desc: "empty log records", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - // Return an empty slice to simulate no logs to push - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil) - return &chronicleExporter{ - cfg: &cfg, - set: componenttest.NewNopTelemetrySettings(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { - // Expect no calls to BatchCreateLogs since there are no logs to push - }, - expectedErr: "", - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - exporter := tc.setupExporter() - tc.setupMocks(exporter.grpcClient.(*mocks.MockIngestionServiceV2Client)) - - // Create a dummy plog.Logs to pass to logsDataPusher - logs := mockLogs(mockLogRecord("Test body", map[string]any{"key1": "value1"})) - - err := exporter.logsDataPusher(ctx, logs) - - if tc.expectedErr == "" { - require.NoError(t, err) - } else { - require.ErrorContains(t, err, tc.expectedErr) - if tc.permanentErr { - require.True(t, consumererror.IsPermanent(err), "Expected error to be permanent") - } else { - require.False(t, consumererror.IsPermanent(err), "Expected error to be transient") - } - } - }) - } -} diff --git a/exporter/chronicleexporter/factory.go b/exporter/chronicleexporter/factory.go index b7a058f2f..b2e296cca 100644 --- a/exporter/chronicleexporter/factory.go +++ b/exporter/chronicleexporter/factory.go @@ -16,7 +16,6 @@ package chronicleexporter import ( "context" - "errors" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/internal/metadata" "go.opentelemetry.io/collector/component" @@ -64,30 +63,29 @@ func createLogsExporter( ctx context.Context, params exporter.Settings, cfg component.Config, -) (exporter.Logs, error) { - chronicleCfg, ok := cfg.(*Config) - if !ok { - return nil, errors.New("invalid config type") +) (exp exporter.Logs, err error) { + c := cfg.(*Config) + if c.Protocol == protocolHTTPS { + exp, err = newHTTPExporter(c, params) + if err != nil { + return nil, err + } + } else { + exp, err = newGRPCExporter(c, params) + if err != nil { + return nil, err + } } - exp, err := newExporter(chronicleCfg, params, params.ID.String()) - if err != nil { - return nil, err - } - - pusher := exp.logsDataPusher - if chronicleCfg.Protocol == protocolHTTPS { - pusher = exp.logsHTTPDataPusher - } return exporterhelper.NewLogs( ctx, params, - chronicleCfg, - pusher, + c, + exp.ConsumeLogs, exporterhelper.WithCapabilities(exp.Capabilities()), - exporterhelper.WithTimeout(chronicleCfg.TimeoutConfig), - exporterhelper.WithQueue(chronicleCfg.QueueConfig), - exporterhelper.WithRetry(chronicleCfg.BackOffConfig), + exporterhelper.WithTimeout(c.TimeoutConfig), + exporterhelper.WithQueue(c.QueueConfig), + exporterhelper.WithRetry(c.BackOffConfig), exporterhelper.WithStart(exp.Start), exporterhelper.WithShutdown(exp.Shutdown), ) diff --git a/exporter/chronicleexporter/grpc_exporter.go b/exporter/chronicleexporter/grpc_exporter.go new file mode 100644 index 000000000..a0c46d55c --- /dev/null +++ b/exporter/chronicleexporter/grpc_exporter.go @@ -0,0 +1,160 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "fmt" + "net/http" + + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/oauth2" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/oauth" + grpcgzip "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/status" +) + +const grpcScope = "https://www.googleapis.com/auth/malachite-ingestion" + +type grpcExporter struct { + cfg *Config + set component.TelemetrySettings + exporterID string + marshaler *protoMarshaler + + client api.IngestionServiceV2Client + conn *grpc.ClientConn + metrics *hostMetricsReporter +} + +func newGRPCExporter(cfg *Config, params exporter.Settings) (*grpcExporter, error) { + marshaler, err := newProtoMarshaler(cfg, params.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("create proto marshaler: %w", err) + } + return &grpcExporter{ + cfg: cfg, + set: params.TelemetrySettings, + exporterID: params.ID.String(), + marshaler: marshaler, + }, nil +} + +func (exp *grpcExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (exp *grpcExporter) Start(ctx context.Context, _ component.Host) error { + ts, err := tokenSource(ctx, exp.cfg) + if err != nil { + return fmt.Errorf("load Google credentials: %w", err) + } + endpoint, dialOpts := grpcClientParams(exp.cfg.Endpoint, ts) + conn, err := grpc.NewClient(endpoint, dialOpts...) + if err != nil { + return fmt.Errorf("dial: %w", err) + } + exp.conn = conn + exp.client = api.NewIngestionServiceV2Client(conn) + + if exp.cfg.CollectAgentMetrics { + f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error { + _, err := exp.client.BatchCreateEvents(ctx, request) + return err + } + metrics, err := newHostMetricsReporter(exp.cfg, exp.set, exp.exporterID, f) + if err != nil { + return fmt.Errorf("create metrics reporter: %w", err) + } + exp.metrics = metrics + exp.metrics.start() + } + + return nil +} + +func (exp *grpcExporter) Shutdown(context.Context) error { + defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() + if exp.metrics != nil { + exp.metrics.shutdown() + } + if exp.conn != nil { + if err := exp.conn.Close(); err != nil { + return fmt.Errorf("connection close: %s", err) + } + } + return nil +} + +func (exp *grpcExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + payloads, err := exp.marshaler.MarshalRawLogs(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + for _, payload := range payloads { + if err := exp.uploadToChronicle(ctx, payload); err != nil { + return err + } + } + return nil +} + +func (exp *grpcExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { + if exp.metrics != nil { + totalLogs := int64(len(request.GetBatch().GetEntries())) + defer exp.metrics.recordSent(totalLogs) + } + _, err := exp.client.BatchCreateLogs(ctx, request, exp.buildOptions()...) + if err != nil { + errCode := status.Code(err) + switch errCode { + // These errors are potentially transient + // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/grpc.go + case codes.Canceled, + codes.Unavailable, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted: + return fmt.Errorf("upload logs to chronicle: %w", err) + default: + return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err)) + } + } + return nil +} + +func (exp *grpcExporter) buildOptions() []grpc.CallOption { + opts := make([]grpc.CallOption, 0) + if exp.cfg.Compression == grpcgzip.Name { + opts = append(opts, grpc.UseCompressor(grpcgzip.Name)) + } + return opts +} + +// Override for testing +var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) { + return cfgEndpoint + ":443", []grpc.DialOption{ + grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}), + grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + } +} diff --git a/exporter/chronicleexporter/http_exporter.go b/exporter/chronicleexporter/http_exporter.go new file mode 100644 index 000000000..232036077 --- /dev/null +++ b/exporter/chronicleexporter/http_exporter.go @@ -0,0 +1,161 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "net/http" + + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + "golang.org/x/oauth2" + grpcgzip "google.golang.org/grpc/encoding/gzip" + "google.golang.org/protobuf/encoding/protojson" +) + +const httpScope = "https://www.googleapis.com/auth/cloud-platform" + +type httpExporter struct { + cfg *Config + set component.TelemetrySettings + marshaler *protoMarshaler + client *http.Client +} + +func newHTTPExporter(cfg *Config, params exporter.Settings) (*httpExporter, error) { + marshaler, err := newProtoMarshaler(cfg, params.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("create proto marshaler: %w", err) + } + return &httpExporter{ + cfg: cfg, + set: params.TelemetrySettings, + marshaler: marshaler, + }, nil +} + +func (exp *httpExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (exp *httpExporter) Start(ctx context.Context, _ component.Host) error { + ts, err := tokenSource(ctx, exp.cfg) + if err != nil { + return fmt.Errorf("load Google credentials: %w", err) + } + exp.client = oauth2.NewClient(context.Background(), ts) + return nil +} + +func (exp *httpExporter) Shutdown(context.Context) error { + defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() + t := exp.client.Transport.(*oauth2.Transport) + if t.Base != nil { + t.Base.(*http.Transport).CloseIdleConnections() + } + return nil +} + +func (exp *httpExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + payloads, err := exp.marshaler.MarshalRawLogsForHTTP(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + for logType, logTypePayloads := range payloads { + for _, payload := range logTypePayloads { + if err := exp.uploadToChronicleHTTP(ctx, payload, logType); err != nil { + return fmt.Errorf("upload to chronicle: %w", err) + } + } + } + return nil +} + +func (exp *httpExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { + data, err := protojson.Marshal(logs) + if err != nil { + return fmt.Errorf("marshal protobuf logs to JSON: %w", err) + } + + var body io.Reader + if exp.cfg.Compression == grpcgzip.Name { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(data); err != nil { + return fmt.Errorf("gzip write: %w", err) + } + if err := gz.Close(); err != nil { + return fmt.Errorf("gzip close: %w", err) + } + body = &b + } else { + body = bytes.NewBuffer(data) + } + + request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(exp.cfg, logType), body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + if exp.cfg.Compression == grpcgzip.Name { + request.Header.Set("Content-Encoding", "gzip") + } + + request.Header.Set("Content-Type", "application/json") + + resp, err := exp.client.Do(request) + if err != nil { + return fmt.Errorf("send request to Chronicle: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err == nil && resp.StatusCode == http.StatusOK { + return nil + } + + if err != nil { + exp.set.Logger.Warn("Failed to read response body", zap.Error(err)) + } else { + exp.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) + } + + // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go + statusErr := errors.New(resp.Status) + switch resp.StatusCode { + case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient + return statusErr + default: + return consumererror.NewPermanent(statusErr) + } +} + +// This uses the DataPlane URL for the request +// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID} +// Override for testing +var httpEndpoint = func(cfg *Config, logType string) string { + formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" + return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType) +} diff --git a/exporter/chronicleexporter/http_exporter_test.go b/exporter/chronicleexporter/http_exporter_test.go index f20b27445..491c10d8e 100644 --- a/exporter/chronicleexporter/http_exporter_test.go +++ b/exporter/chronicleexporter/http_exporter_test.go @@ -125,7 +125,7 @@ func TestHTTPExporter(t *testing.T) { return logs }(), expectedRequests: 1, - expectedErr: "upload logs to chronicle: 503 Service Unavailable", + expectedErr: "upload to chronicle: 503 Service Unavailable", permanentErr: false, }, { @@ -144,7 +144,7 @@ func TestHTTPExporter(t *testing.T) { return logs }(), expectedRequests: 1, - expectedErr: "Permanent error: upload logs to chronicle: 401 Unauthorized", + expectedErr: "upload to chronicle: Permanent error: 401 Unauthorized", permanentErr: true, }, } diff --git a/exporter/chronicleexporter/marshal.go b/exporter/chronicleexporter/marshal.go index 8243e9f6b..a12846239 100644 --- a/exporter/chronicleexporter/marshal.go +++ b/exporter/chronicleexporter/marshal.go @@ -54,14 +54,18 @@ type logMarshaler interface { MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) } type protoMarshaler struct { - cfg Config + cfg *Config teleSettings component.TelemetrySettings startTime time.Time customerID []byte collectorID []byte } -func newProtoMarshaler(cfg Config, teleSettings component.TelemetrySettings, customerID []byte) (*protoMarshaler, error) { +func newProtoMarshaler(cfg *Config, teleSettings component.TelemetrySettings) (*protoMarshaler, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } return &protoMarshaler{ startTime: time.Now(), cfg: cfg, @@ -514,11 +518,6 @@ func (m *protoMarshaler) extractRawHTTPLogs(ctx context.Context, ld plog.Logs) ( return entries, nil } -func buildForwarderString(cfg Config) string { - format := "projects/%s/locations/%s/instances/%s/forwarders/%s" - return fmt.Sprintf(format, cfg.Project, cfg.Location, cfg.CustomerID, cfg.Forwarder) -} - func (m *protoMarshaler) constructHTTPPayloads(rawLogs map[string][]*api.Log) map[string][]*api.ImportLogsRequest { payloads := make(map[string][]*api.ImportLogsRequest, len(rawLogs)) @@ -570,9 +569,14 @@ func (m *protoMarshaler) buildHTTPRequest(entries []*api.Log) *api.ImportLogsReq Source: &api.ImportLogsRequest_InlineSource{ InlineSource: &api.ImportLogsRequest_LogsInlineSource{ - Forwarder: buildForwarderString(m.cfg), + Forwarder: m.buildForwarderString(), Logs: entries, }, }, } } + +func (m *protoMarshaler) buildForwarderString() string { + format := "projects/%s/locations/%s/instances/%s/forwarders/%s" + return fmt.Sprintf(format, m.cfg.Project, m.cfg.Location, m.cfg.CustomerID, m.cfg.Forwarder) +} diff --git a/exporter/chronicleexporter/marshal_test.go b/exporter/chronicleexporter/marshal_test.go index d7c219186..406a1f5a2 100644 --- a/exporter/chronicleexporter/marshal_test.go +++ b/exporter/chronicleexporter/marshal_test.go @@ -773,10 +773,7 @@ func TestProtoMarshaler_MarshalRawLogs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - customerID, err := uuid.Parse(tt.cfg.CustomerID) - require.NoError(t, err) - - marshaler, err := newProtoMarshaler(tt.cfg, component.TelemetrySettings{Logger: logger}, customerID[:]) + marshaler, err := newProtoMarshaler(&tt.cfg, component.TelemetrySettings{Logger: logger}) marshaler.startTime = startTime require.NoError(t, err) @@ -1483,10 +1480,7 @@ func TestProtoMarshaler_MarshalRawLogsForHTTP(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - customerID, err := uuid.Parse(tt.cfg.CustomerID) - require.NoError(t, err) - - marshaler, err := newProtoMarshaler(tt.cfg, component.TelemetrySettings{Logger: logger}, customerID[:]) + marshaler, err := newProtoMarshaler(&tt.cfg, component.TelemetrySettings{Logger: logger}) marshaler.startTime = startTime require.NoError(t, err)