From 81760980df7e8292507ebeac1347906cdacfc444 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 11 Dec 2024 13:13:25 -0500 Subject: [PATCH] Split HTTP and GRPC exporters --- exporter/chronicleexporter/config.go | 2 +- exporter/chronicleexporter/exporter.go | 330 ------------------ exporter/chronicleexporter/exporter_test.go | 171 --------- exporter/chronicleexporter/factory.go | 58 ++- exporter/chronicleexporter/factory_test.go | 2 +- exporter/chronicleexporter/grpc_exporter.go | 159 +++++++++ .../chronicleexporter/grpc_exporter_test.go | 182 ++++++++++ exporter/chronicleexporter/hostmetrics.go | 138 +++++--- exporter/chronicleexporter/http_exporter.go | 178 ++++++++++ .../chronicleexporter/http_exporter_test.go | 190 ++++++++++ exporter/chronicleexporter/marshal.go | 37 +- exporter/chronicleexporter/marshal_test.go | 4 +- .../chronicleexporter/mock_log_marshaler.go | 92 ----- .../protos/api/mocks/mock_service.go | 177 ---------- exporter/chronicleexporter/util.go | 48 +++ exporter/chronicleexporter/util_test.go | 31 ++ 16 files changed, 931 insertions(+), 868 deletions(-) delete mode 100644 exporter/chronicleexporter/exporter.go delete mode 100644 exporter/chronicleexporter/exporter_test.go create mode 100644 exporter/chronicleexporter/grpc_exporter.go create mode 100644 exporter/chronicleexporter/grpc_exporter_test.go create mode 100644 exporter/chronicleexporter/http_exporter.go create mode 100644 exporter/chronicleexporter/http_exporter_test.go delete mode 100644 exporter/chronicleexporter/mock_log_marshaler.go delete mode 100644 exporter/chronicleexporter/protos/api/mocks/mock_service.go create mode 100644 exporter/chronicleexporter/util.go create mode 100644 exporter/chronicleexporter/util_test.go diff --git a/exporter/chronicleexporter/config.go b/exporter/chronicleexporter/config.go index 0347cc26d..f493c5afa 100644 --- a/exporter/chronicleexporter/config.go +++ b/exporter/chronicleexporter/config.go @@ -104,7 +104,7 @@ type Config struct { // BatchRequestSizeLimitHTTP is the maximum batch request size, in bytes, that can be sent to Chronicle via the HTTP protocol // This field is defaulted to 1048576 as that is the default Chronicle backend limit // Setting this option to a value above the Chronicle backend limit may result in rejected log batch requests - BatchRequestSizeLimitHTTP int `mapstructure:"batch_request_size_limit_grpc"` + BatchRequestSizeLimitHTTP int `mapstructure:"batch_request_size_limit_http"` } // Validate checks if the configuration is valid. diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go deleted file mode 100644 index a5dce88ed..000000000 --- a/exporter/chronicleexporter/exporter.go +++ /dev/null @@ -1,330 +0,0 @@ -// Copyright observIQ, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chronicleexporter - -import ( - "bytes" - "compress/gzip" - "context" - "errors" - "fmt" - "io" - "net/http" - "os" - "sync" - "time" - - "github.com/google/uuid" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" - "golang.org/x/oauth2" - "golang.org/x/oauth2/google" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/oauth" - grpcgzip "google.golang.org/grpc/encoding/gzip" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/encoding/protojson" -) - -const ( - grpcScope = "https://www.googleapis.com/auth/malachite-ingestion" - httpScope = "https://www.googleapis.com/auth/cloud-platform" - - baseEndpoint = "malachiteingestion-pa.googleapis.com" -) - -type chronicleExporter struct { - cfg *Config - logger *zap.Logger - marshaler logMarshaler - collectorID, exporterID string - - // fields used for gRPC - grpcClient api.IngestionServiceV2Client - grpcConn *grpc.ClientConn - wg sync.WaitGroup - cancel context.CancelFunc - metrics *exporterMetrics - - // fields used for HTTP - httpClient *http.Client -} - -func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) { - customerID, err := uuid.Parse(cfg.CustomerID) - if err != nil { - return nil, fmt.Errorf("parse customer ID: %w", err) - } - - marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:]) - if err != nil { - return nil, fmt.Errorf("create proto marshaller: %w", err) - } - - uuidCID, err := uuid.Parse(collectorID) - if err != nil { - return nil, fmt.Errorf("parse collector ID: %w", err) - } - - return &chronicleExporter{ - cfg: cfg, - logger: params.Logger, - metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace), - marshaler: marshaller, - collectorID: collectorID, - exporterID: exporterID, - }, nil -} - -func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error { - creds, err := loadGoogleCredentials(ce.cfg) - if err != nil { - return fmt.Errorf("load Google credentials: %w", err) - } - - if ce.cfg.Protocol == protocolHTTPS { - ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource) - return nil - } - - opts := []grpc.DialOption{ - // Apply OAuth tokens for each RPC call - grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}), - grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), - } - conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...) - if err != nil { - return fmt.Errorf("dial: %w", err) - } - ce.grpcConn = conn - ce.grpcClient = api.NewIngestionServiceV2Client(conn) - - if ce.cfg.CollectAgentMetrics { - ctx, cancel := context.WithCancel(context.Background()) - ce.cancel = cancel - ce.wg.Add(1) - go ce.startHostMetricsCollection(ctx) - } - - return nil -} - -func (ce *chronicleExporter) Shutdown(context.Context) error { - defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() - if ce.cfg.Protocol == protocolHTTPS { - t := ce.httpClient.Transport.(*oauth2.Transport) - if t.Base != nil { - t.Base.(*http.Transport).CloseIdleConnections() - } - return nil - } - if ce.cancel != nil { - ce.cancel() - ce.wg.Wait() - } - if ce.grpcConn != nil { - if err := ce.grpcConn.Close(); err != nil { - return fmt.Errorf("connection close: %s", err) - } - } - return nil -} - -func (ce *chronicleExporter) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) { - scope := grpcScope - if cfg.Protocol == protocolHTTPS { - scope = httpScope - } - - switch { - case cfg.Creds != "": - return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope) - case cfg.CredsFilePath != "": - credsData, err := os.ReadFile(cfg.CredsFilePath) - if err != nil { - return nil, fmt.Errorf("read credentials file: %w", err) - } - - if len(credsData) == 0 { - return nil, errors.New("credentials file is empty") - } - - return google.CredentialsFromJSON(context.Background(), credsData, scope) - default: - return google.FindDefaultCredentials(context.Background(), scope) - } -} - -func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error { - payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld) - if err != nil { - return fmt.Errorf("marshal logs: %w", err) - } - - for _, payload := range payloads { - if err := ce.uploadToChronicle(ctx, payload); err != nil { - return fmt.Errorf("upload to chronicle: %w", err) - } - } - - return nil -} - -func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { - totalLogs := int64(len(request.GetBatch().GetEntries())) - - _, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...) - if err != nil { - errCode := status.Code(err) - switch errCode { - // These errors are potentially transient - case codes.Canceled, - codes.Unavailable, - codes.DeadlineExceeded, - codes.ResourceExhausted, - codes.Aborted: - return fmt.Errorf("upload logs to chronicle: %w", err) - default: - return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err)) - } - } - - ce.metrics.addSentLogs(totalLogs) - ce.metrics.updateLastSuccessfulUpload() - return nil -} - -func (ce *chronicleExporter) buildOptions() []grpc.CallOption { - opts := make([]grpc.CallOption, 0) - - if ce.cfg.Compression == grpcgzip.Name { - opts = append(opts, grpc.UseCompressor(grpcgzip.Name)) - } - - return opts -} - -func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) { - ticker := time.NewTicker(5 * time.Minute) - defer ticker.Stop() - - defer ce.wg.Done() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - err := ce.metrics.collectHostMetrics() - if err != nil { - ce.logger.Error("Failed to collect host metrics", zap.Error(err)) - } - request := ce.metrics.getAndReset() - _, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...) - if err != nil { - ce.logger.Error("Failed to upload host metrics", zap.Error(err)) - } - } - } -} - -func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error { - payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld) - if err != nil { - return fmt.Errorf("marshal logs: %w", err) - } - - for logType, logTypePayloads := range payloads { - for _, payload := range logTypePayloads { - if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { - return fmt.Errorf("upload to chronicle: %w", err) - } - } - } - - return nil -} - -// This uses the DataPlane URL for the request -// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import -func buildEndpoint(cfg *Config, logType string) string { - // Location Endpoint Version Project Location Instance LogType - formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" - return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType) -} - -func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { - - data, err := protojson.Marshal(logs) - if err != nil { - return fmt.Errorf("marshal protobuf logs to JSON: %w", err) - } - - var body io.Reader - - if ce.cfg.Compression == grpcgzip.Name { - var b bytes.Buffer - gz := gzip.NewWriter(&b) - if _, err := gz.Write(data); err != nil { - return fmt.Errorf("gzip write: %w", err) - } - if err := gz.Close(); err != nil { - return fmt.Errorf("gzip close: %w", err) - } - body = &b - } else { - body = bytes.NewBuffer(data) - } - - request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body) - if err != nil { - return fmt.Errorf("create request: %w", err) - } - - if ce.cfg.Compression == grpcgzip.Name { - request.Header.Set("Content-Encoding", "gzip") - } - - request.Header.Set("Content-Type", "application/json") - - resp, err := ce.httpClient.Do(request) - if err != nil { - return fmt.Errorf("send request to Chronicle: %w", err) - } - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) - if resp.StatusCode != http.StatusOK { - if err != nil { - ce.logger.Warn("Failed to read response body", zap.Error(err)) - } else { - ce.logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) - } - return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status) - } - - return nil -} diff --git a/exporter/chronicleexporter/exporter_test.go b/exporter/chronicleexporter/exporter_test.go deleted file mode 100644 index 805148540..000000000 --- a/exporter/chronicleexporter/exporter_test.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright observIQ, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chronicleexporter - -import ( - "context" - "errors" - "testing" - - "github.com/golang/mock/gomock" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api/mocks" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumererror" - "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -func TestLogsDataPusher(t *testing.T) { - - // Set up configuration, logger, and context - cfg := Config{Endpoint: baseEndpoint} - ctx := context.Background() - - testCases := []struct { - desc string - setupExporter func() *chronicleExporter - setupMocks func(*mocks.MockIngestionServiceV2Client) - expectedErr string - permanentErr bool - }{ - { - desc: "successful push to Chronicle", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), - logger: zap.NewNop(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(&api.BatchCreateLogsResponse{}, nil) - }, - expectedErr: "", - }, - { - desc: "upload to Chronicle fails (transient)", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), - logger: zap.NewNop(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - // Simulate an error returned from the Chronicle service - err := status.Error(codes.Unavailable, "service unavailable") - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) - }, - expectedErr: "service unavailable", - }, - { - desc: "upload to Chronicle fails (permanent)", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) - return &chronicleExporter{ - cfg: &cfg, - metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), - logger: zap.NewNop(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { - err := status.Error(codes.InvalidArgument, "Invalid argument detected.") - mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, err) - }, - expectedErr: "Invalid argument detected.", - permanentErr: true, - }, - { - desc: "marshaler error", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - // Simulate an error during log marshaling - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error")) - return &chronicleExporter{ - cfg: &cfg, - metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), - logger: zap.NewNop(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { - // No need to setup mocks for the client as the error occurs before the client is used - }, - expectedErr: "marshal error", - }, - { - desc: "empty log records", - setupExporter: func() *chronicleExporter { - mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) - marshaller := NewMockMarshaler(t) - // Return an empty slice to simulate no logs to push - marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil) - return &chronicleExporter{ - cfg: &cfg, - metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace), - logger: zap.NewNop(), - grpcClient: mockClient, - marshaler: marshaller, - } - }, - setupMocks: func(_ *mocks.MockIngestionServiceV2Client) { - // Expect no calls to BatchCreateLogs since there are no logs to push - }, - expectedErr: "", - }, - } - - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - exporter := tc.setupExporter() - tc.setupMocks(exporter.grpcClient.(*mocks.MockIngestionServiceV2Client)) - - // Create a dummy plog.Logs to pass to logsDataPusher - logs := mockLogs(mockLogRecord("Test body", map[string]any{"key1": "value1"})) - - err := exporter.logsDataPusher(ctx, logs) - - if tc.expectedErr == "" { - require.NoError(t, err) - } else { - require.ErrorContains(t, err, tc.expectedErr) - if tc.permanentErr { - require.True(t, consumererror.IsPermanent(err), "Expected error to be permanent") - } else { - require.False(t, consumererror.IsPermanent(err), "Expected error to be transient") - } - } - }) - } -} diff --git a/exporter/chronicleexporter/factory.go b/exporter/chronicleexporter/factory.go index 6a2426576..2efcc1a72 100644 --- a/exporter/chronicleexporter/factory.go +++ b/exporter/chronicleexporter/factory.go @@ -16,15 +16,12 @@ package chronicleexporter import ( "context" - "errors" - "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" ) // NewFactory creates a new Chronicle exporter factory. @@ -35,10 +32,13 @@ func NewFactory() exporter.Factory { exporter.WithLogs(createLogsExporter, metadata.LogsStability)) } -const defaultBatchLogCountLimitGRPC = 1000 -const defaultBatchRequestSizeLimitGRPC = 1048576 -const defaultBatchLogCountLimitHTTP = 1000 -const defaultBatchRequestSizeLimitHTTP = 1048576 +const ( + defaultEndpoint = "malachiteingestion-pa.googleapis.com" + defaultBatchLogCountLimitGRPC = 1000 + defaultBatchRequestSizeLimitGRPC = 1048576 + defaultBatchLogCountLimitHTTP = 1000 + defaultBatchRequestSizeLimitHTTP = 1048576 +) // createDefaultConfig creates the default configuration for the exporter. func createDefaultConfig() component.Config { @@ -48,9 +48,9 @@ func createDefaultConfig() component.Config { QueueConfig: exporterhelper.NewDefaultQueueConfig(), BackOffConfig: configretry.NewDefaultBackOffConfig(), OverrideLogType: true, - Endpoint: baseEndpoint, Compression: noCompression, CollectAgentMetrics: true, + Endpoint: defaultEndpoint, BatchLogCountLimitGRPC: defaultBatchLogCountLimitGRPC, BatchRequestSizeLimitGRPC: defaultBatchRequestSizeLimitGRPC, BatchLogCountLimitHTTP: defaultBatchLogCountLimitHTTP, @@ -63,38 +63,28 @@ func createLogsExporter( ctx context.Context, params exporter.Settings, cfg component.Config, -) (exporter.Logs, error) { - chronicleCfg, ok := cfg.(*Config) - if !ok { - return nil, errors.New("invalid config type") - } - - var cID string - sid, ok := params.Resource.Attributes().Get(semconv.AttributeServiceInstanceID) - if ok { - cID = sid.AsString() +) (exp exporter.Logs, err error) { + c := cfg.(*Config) + if c.Protocol == protocolHTTPS { + exp, err = newHTTPExporter(c, params) + if err != nil { + return nil, err + } } else { - cID = uuid.New().String() - } - - exp, err := newExporter(chronicleCfg, params, cID, params.ID.String()) - if err != nil { - return nil, err - } - - pusher := exp.logsDataPusher - if chronicleCfg.Protocol == protocolHTTPS { - pusher = exp.logsHTTPDataPusher + exp, err = newGRPCExporter(c, params) + if err != nil { + return nil, err + } } return exporterhelper.NewLogs( ctx, params, - chronicleCfg, - pusher, + c, + exp.ConsumeLogs, exporterhelper.WithCapabilities(exp.Capabilities()), - exporterhelper.WithTimeout(chronicleCfg.TimeoutConfig), - exporterhelper.WithQueue(chronicleCfg.QueueConfig), - exporterhelper.WithRetry(chronicleCfg.BackOffConfig), + exporterhelper.WithTimeout(c.TimeoutConfig), + exporterhelper.WithQueue(c.QueueConfig), + exporterhelper.WithRetry(c.BackOffConfig), exporterhelper.WithStart(exp.Start), exporterhelper.WithShutdown(exp.Shutdown), ) diff --git a/exporter/chronicleexporter/factory_test.go b/exporter/chronicleexporter/factory_test.go index 7de7b7fef..b1d5605c2 100644 --- a/exporter/chronicleexporter/factory_test.go +++ b/exporter/chronicleexporter/factory_test.go @@ -38,6 +38,6 @@ func Test_createDefaultConfig(t *testing.T) { BatchRequestSizeLimitHTTP: defaultBatchRequestSizeLimitHTTP, } - actual := createDefaultConfig() + actual := NewFactory().CreateDefaultConfig() require.Equal(t, expectedCfg, actual) } diff --git a/exporter/chronicleexporter/grpc_exporter.go b/exporter/chronicleexporter/grpc_exporter.go new file mode 100644 index 000000000..5b8150b81 --- /dev/null +++ b/exporter/chronicleexporter/grpc_exporter.go @@ -0,0 +1,159 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/oauth" + grpcgzip "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/status" +) + +const grpcScope = "https://www.googleapis.com/auth/malachite-ingestion" + +type grpcExporter struct { + cfg *Config + set component.TelemetrySettings + id string + marshaler *protoMarshaler + + client api.IngestionServiceV2Client + conn *grpc.ClientConn + metrics *hostMetricsReporter +} + +func newGRPCExporter(cfg *Config, params exporter.Settings) (*grpcExporter, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } + marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:]) + if err != nil { + return nil, fmt.Errorf("create proto marshaller: %w", err) + } + return &grpcExporter{ + cfg: cfg, + set: params.TelemetrySettings, + id: params.ID.String(), + marshaler: marshaller, + }, nil +} + +func (exp *grpcExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (exp *grpcExporter) Start(ctx context.Context, _ component.Host) error { + conn, err := getGRPCClient(ctx, exp.cfg) + if err != nil { + return fmt.Errorf("dial: %w", err) + } + exp.conn = conn + exp.client = api.NewIngestionServiceV2Client(conn) + + if exp.cfg.CollectAgentMetrics { + f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error { + _, err := exp.client.BatchCreateEvents(ctx, request) + return err + } + metrics, err := newHostMetricsReporter(exp.cfg, exp.set, exp.id, f) + if err != nil { + return fmt.Errorf("create metrics reporter: %w", err) + } + exp.metrics = metrics + } + + return nil +} + +func (exp *grpcExporter) Shutdown(context.Context) error { + defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() + if exp.conn != nil { + if err := exp.conn.Close(); err != nil { + return fmt.Errorf("connection close: %s", err) + } + } + return nil +} + +func (exp *grpcExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + payloads, err := exp.marshaler.MarshalRawLogs(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + for _, payload := range payloads { + if err := exp.uploadToChronicle(ctx, payload); err != nil { + return err + } + } + return nil +} + +func (exp *grpcExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { + totalLogs := int64(len(request.GetBatch().GetEntries())) + _, err := exp.client.BatchCreateLogs(ctx, request, exp.buildOptions()...) + if err != nil { + errCode := status.Code(err) + switch errCode { + // These errors are potentially transient + // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/grpc.go + case codes.Canceled, + codes.Unavailable, + codes.DeadlineExceeded, + codes.ResourceExhausted, + codes.Aborted: + return fmt.Errorf("upload logs to chronicle: %w", err) + default: + return consumererror.NewPermanent(fmt.Errorf("upload logs to chronicle: %w", err)) + } + } + exp.metrics.addSentLogs(totalLogs) + exp.metrics.updateLastSuccessfulUpload() + return nil +} + +func (exp *grpcExporter) buildOptions() []grpc.CallOption { + opts := make([]grpc.CallOption, 0) + if exp.cfg.Compression == grpcgzip.Name { + opts = append(opts, grpc.UseCompressor(grpcgzip.Name)) + } + return opts +} + +var getGRPCClient func(context.Context, *Config) (*grpc.ClientConn, error) = buildGRPCClient + +func buildGRPCClient(ctx context.Context, cfg *Config) (*grpc.ClientConn, error) { + creds, err := googleCredentials(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("load Google credentials: %w", err) + } + return grpc.NewClient(cfg.Endpoint+":443", + grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}), + grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + ) +} diff --git a/exporter/chronicleexporter/grpc_exporter_test.go b/exporter/chronicleexporter/grpc_exporter_test.go new file mode 100644 index 000000000..662706324 --- /dev/null +++ b/exporter/chronicleexporter/grpc_exporter_test.go @@ -0,0 +1,182 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "net" + "testing" + + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +type mockGRPCServer struct { + api.UnimplementedIngestionServiceV2Server + srv *grpc.Server + requests int + handler mockBatchCreateLogsHandler +} + +var _ api.IngestionServiceV2Server = (*mockGRPCServer)(nil) + +type mockBatchCreateLogsHandler func(*api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) + +func newMockGRPCServer(t *testing.T, handler mockBatchCreateLogsHandler) (*mockGRPCServer, string) { + mockServer := &mockGRPCServer{ + srv: grpc.NewServer(), + handler: handler, + } + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + mockServer.srv.RegisterService(&api.IngestionServiceV2_ServiceDesc, mockServer) + go func() { + require.NoError(t, mockServer.srv.Serve(ln)) + }() + return mockServer, ln.Addr().String() +} + +func (s *mockGRPCServer) BatchCreateEvents(ctx context.Context, req *api.BatchCreateEventsRequest) (*api.BatchCreateEventsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "TODO") +} +func (s *mockGRPCServer) BatchCreateLogs(ctx context.Context, req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + s.requests++ + return s.handler(req) +} + +func TestGRPCExporter(t *testing.T) { + // By default, tests will apply the following changes to NewFactory.CreateDefaultConfig() + defaultCfgMod := func(cfg *Config) { + cfg.Protocol = protocolGRPC + cfg.CustomerID = "00000000-1111-2222-3333-444444444444" + cfg.LogType = "FAKE" + cfg.QueueConfig.Enabled = false + cfg.BackOffConfig.Enabled = false + } + + testCases := []struct { + name string + handler mockBatchCreateLogsHandler + input plog.Logs + expectedRequests int + expectedErr string + permanentErr bool + }{ + { + name: "empty log record", + input: plog.NewLogs(), + expectedRequests: 0, + }, + { + name: "single log record", + handler: func(req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return &api.BatchCreateLogsResponse{}, nil + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + }, + // TODO test splitting large payloads + { + name: "transient_error", + handler: func(req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return nil, status.Error(codes.Unavailable, "Service Unavailable") + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "upload logs to chronicle: rpc error: code = Unavailable desc = Service Unavailable", + permanentErr: false, + }, + { + name: "permanent_error", + handler: func(req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return nil, status.Error(codes.Unauthenticated, "Unauthorized") + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "Permanent error: upload logs to chronicle: rpc error: code = Unauthenticated desc = Unauthorized", + permanentErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockServer, endpoint := newMockGRPCServer(t, tc.handler) + defer mockServer.srv.GracefulStop() + + // Override the client builder so we can use the mock server + getGRPCClient = func(ctx context.Context, cfg *Config) (*grpc.ClientConn, error) { + return grpc.NewClient(endpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + defer func() { + getGRPCClient = buildGRPCClient + }() + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + defaultCfgMod(cfg) + cfg.Endpoint = endpoint + + require.NoError(t, cfg.Validate()) + + ctx := context.Background() + exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, exp.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, exp.Shutdown(ctx)) + }() + + err = exp.ConsumeLogs(ctx, tc.input) + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err)) + } + + require.Equal(t, tc.expectedRequests, mockServer.requests) + }) + } +} diff --git a/exporter/chronicleexporter/hostmetrics.go b/exporter/chronicleexporter/hostmetrics.go index feccae783..ed5de40d9 100644 --- a/exporter/chronicleexporter/hostmetrics.go +++ b/exporter/chronicleexporter/hostmetrics.go @@ -15,6 +15,7 @@ package chronicleexporter import ( + "context" "fmt" "os" "sync" @@ -23,10 +24,18 @@ import ( "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" "github.com/shirou/gopsutil/v3/process" + "go.opentelemetry.io/collector/component" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" ) -type exporterMetrics struct { +type hostMetricsReporter struct { + set component.TelemetrySettings + cancel context.CancelFunc + wg sync.WaitGroup + send sendMetricsFunc + mutex sync.Mutex agentID []byte customerID []byte @@ -38,32 +47,83 @@ type exporterMetrics struct { logsSent int64 } -func newExporterMetrics(agentID, customerID []byte, exporterID, namespace string) *exporterMetrics { +type sendMetricsFunc func(context.Context, *api.BatchCreateEventsRequest) error + +func newHostMetricsReporter(cfg *Config, set component.TelemetrySettings, expID string, send sendMetricsFunc) (*hostMetricsReporter, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } + + agentID := uuid.New() + if sid, ok := set.Resource.Attributes().Get(semconv.AttributeServiceInstanceID); ok { + var err error + agentID, err = uuid.Parse(sid.AsString()) + if err != nil { + return nil, fmt.Errorf("parse collector ID: %w", err) + } + } + now := timestamppb.Now() - return &exporterMetrics{ - agentID: agentID, - exporterID: exporterID, + return &hostMetricsReporter{ + set: set, + send: send, startTime: now, - customerID: customerID, - namespace: namespace, + agentID: agentID[:], + exporterID: expID, + customerID: customerID[:], + namespace: cfg.Namespace, stats: &api.AgentStatsEvent{ + AgentId: agentID[:], WindowStartTime: now, - AgentId: agentID, StartTime: now, }, + }, nil +} + +func (hmr *hostMetricsReporter) start() error { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + hmr.cancel = cancel + hmr.wg.Add(1) + defer hmr.wg.Done() + + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + err := hmr.collectHostMetrics() + if err != nil { + hmr.set.Logger.Error("Failed to collect host metrics", zap.Error(err)) + } + request := hmr.getAndReset() + if err = hmr.send(ctx, request); err != nil { + hmr.set.Logger.Error("Failed to upload host metrics", zap.Error(err)) + } + } + } +} + +func (hmr *hostMetricsReporter) shutdown() { + if hmr.cancel != nil { + hmr.cancel() + hmr.wg.Wait() } } -func (cm *exporterMetrics) getAndReset() *api.BatchCreateEventsRequest { - cm.mutex.Lock() - defer cm.mutex.Unlock() +func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest { + hmr.mutex.Lock() + defer hmr.mutex.Unlock() now := timestamppb.Now() batchID := uuid.New() source := &api.EventSource{ CollectorId: chronicleCollectorID[:], - Namespace: cm.namespace, - CustomerId: cm.customerID, + Namespace: hmr.namespace, + CustomerId: hmr.customerID, } request := &api.BatchCreateEventsRequest{ @@ -71,44 +131,44 @@ func (cm *exporterMetrics) getAndReset() *api.BatchCreateEventsRequest { Id: batchID[:], Source: source, Type: api.EventBatch_AGENT_STATS, - StartTime: cm.startTime, + StartTime: hmr.startTime, Events: []*api.Event{ { Timestamp: now, CollectionTime: now, Source: source, Payload: &api.Event_AgentStats{ - AgentStats: cm.stats, + AgentStats: hmr.stats, }, }, }, }, } - cm.resetStats() + hmr.resetStats() return request } -func (cm *exporterMetrics) resetStats() { - cm.stats = &api.AgentStatsEvent{ +func (hmr *hostMetricsReporter) resetStats() { + hmr.stats = &api.AgentStatsEvent{ ExporterStats: []*api.ExporterStats{ { - Name: cm.exporterID, - AcceptedSpans: cm.logsSent, - RefusedSpans: cm.logsDropped, + Name: hmr.exporterID, + AcceptedSpans: hmr.logsSent, + RefusedSpans: hmr.logsDropped, }, }, - AgentId: cm.agentID, - StartTime: cm.startTime, + AgentId: hmr.agentID, + StartTime: hmr.startTime, WindowStartTime: timestamppb.Now(), } - cm.logsDropped = 0 - cm.logsSent = 0 + hmr.logsDropped = 0 + hmr.logsSent = 0 } -func (cm *exporterMetrics) collectHostMetrics() error { - cm.mutex.Lock() - defer cm.mutex.Unlock() +func (hmr *hostMetricsReporter) collectHostMetrics() error { + hmr.mutex.Lock() + defer hmr.mutex.Unlock() // Get the current process using the current PID proc, err := process.NewProcess(int32(os.Getpid())) @@ -124,14 +184,14 @@ func (cm *exporterMetrics) collectHostMetrics() error { totalCPUTime := cpuTimes.User + cpuTimes.System // convert to milliseconds - cm.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000) + hmr.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000) // Collect memory usage (RSS) memInfo, err := proc.MemoryInfo() if err != nil { return fmt.Errorf("get memory info: %w", err) } - cm.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes + hmr.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes // Calculate process uptime startTimeMs, err := proc.CreateTime() @@ -140,19 +200,19 @@ func (cm *exporterMetrics) collectHostMetrics() error { } startTimeSec := startTimeMs / 1000 currentTimeSec := time.Now().Unix() - cm.stats.ProcessUptime = currentTimeSec - startTimeSec + hmr.stats.ProcessUptime = currentTimeSec - startTimeSec return nil } -func (cm *exporterMetrics) updateLastSuccessfulUpload() { - cm.mutex.Lock() - defer cm.mutex.Unlock() - cm.stats.LastSuccessfulUploadTime = timestamppb.Now() +func (hmr *hostMetricsReporter) updateLastSuccessfulUpload() { + hmr.mutex.Lock() + defer hmr.mutex.Unlock() + hmr.stats.LastSuccessfulUploadTime = timestamppb.Now() } -func (cm *exporterMetrics) addSentLogs(count int64) { - cm.mutex.Lock() - defer cm.mutex.Unlock() - cm.logsSent += count +func (hmr *hostMetricsReporter) addSentLogs(count int64) { + hmr.mutex.Lock() + defer hmr.mutex.Unlock() + hmr.logsSent += count } diff --git a/exporter/chronicleexporter/http_exporter.go b/exporter/chronicleexporter/http_exporter.go new file mode 100644 index 000000000..51d898554 --- /dev/null +++ b/exporter/chronicleexporter/http_exporter.go @@ -0,0 +1,178 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "net/http" + + "github.com/google/uuid" + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + "golang.org/x/oauth2" + grpcgzip "google.golang.org/grpc/encoding/gzip" + "google.golang.org/protobuf/encoding/protojson" +) + +const httpScope = "https://www.googleapis.com/auth/cloud-platform" + +type httpExporter struct { + cfg *Config + set component.TelemetrySettings + marshaler *protoMarshaler + client *http.Client +} + +func newHTTPExporter(cfg *Config, params exporter.Settings) (*httpExporter, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } + marshaller, err := newProtoMarshaler(*cfg, params.TelemetrySettings, customerID[:]) + if err != nil { + return nil, fmt.Errorf("create proto marshaller: %w", err) + } + return &httpExporter{ + cfg: cfg, + set: params.TelemetrySettings, + marshaler: marshaller, + }, nil +} + +func (exp *httpExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (exp *httpExporter) Start(ctx context.Context, _ component.Host) error { + ts, err := getTokenSource(ctx, exp.cfg) + if err != nil { + return fmt.Errorf("load Google credentials: %w", err) + } + exp.client = oauth2.NewClient(context.Background(), ts) + return nil +} + +func (exp *httpExporter) Shutdown(context.Context) error { + defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() + t := exp.client.Transport.(*oauth2.Transport) + if t.Base != nil { + t.Base.(*http.Transport).CloseIdleConnections() + } + return nil +} + +func (exp *httpExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + payloads, err := exp.marshaler.MarshalRawLogsForHTTP(ctx, ld) + if err != nil { + return fmt.Errorf("marshal logs: %w", err) + } + for logType, logTypePayloads := range payloads { + for _, payload := range logTypePayloads { + if err := exp.uploadToChronicleHTTP(ctx, payload, logType); err != nil { + return fmt.Errorf("upload to chronicle: %w", err) + } + } + } + return nil +} + +func (exp *httpExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { + data, err := protojson.Marshal(logs) + if err != nil { + return fmt.Errorf("marshal protobuf logs to JSON: %w", err) + } + + var body io.Reader + if exp.cfg.Compression == grpcgzip.Name { + var b bytes.Buffer + gz := gzip.NewWriter(&b) + if _, err := gz.Write(data); err != nil { + return fmt.Errorf("gzip write: %w", err) + } + if err := gz.Close(); err != nil { + return fmt.Errorf("gzip close: %w", err) + } + body = &b + } else { + body = bytes.NewBuffer(data) + } + + request, err := http.NewRequestWithContext(ctx, "POST", getHTTPEndpoint(exp.cfg, logType), body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + if exp.cfg.Compression == grpcgzip.Name { + request.Header.Set("Content-Encoding", "gzip") + } + + request.Header.Set("Content-Type", "application/json") + + resp, err := exp.client.Do(request) + if err != nil { + return fmt.Errorf("send request to Chronicle: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err == nil && resp.StatusCode == http.StatusOK { + return nil + } + + if err != nil { + exp.set.Logger.Warn("Failed to read response body", zap.Error(err)) + } else { + exp.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) + } + + // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go + statusErr := errors.New(resp.Status) + switch resp.StatusCode { + case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient + return statusErr + default: + return consumererror.NewPermanent(statusErr) + } +} + +// Override for testing +var getHTTPEndpoint func(*Config, string) string = buildHTTPEndpoint + +// This uses the DataPlane URL for the request +// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID} +func buildHTTPEndpoint(cfg *Config, logType string) string { + formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" + return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType) +} + +var getTokenSource func(context.Context, *Config) (oauth2.TokenSource, error) = googleTokenSource + +func googleTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { + creds, err := googleCredentials(ctx, cfg) + if err != nil { + return nil, err + } + return creds.TokenSource, nil +} diff --git a/exporter/chronicleexporter/http_exporter_test.go b/exporter/chronicleexporter/http_exporter_test.go new file mode 100644 index 000000000..5ad6369d1 --- /dev/null +++ b/exporter/chronicleexporter/http_exporter_test.go @@ -0,0 +1,190 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" +) + +type mockHTTPServer struct { + srv *httptest.Server + requests []string +} + +func newMockHTTPServer(logTypeHandlers map[string]http.HandlerFunc) *mockHTTPServer { + mockServer := mockHTTPServer{} + mux := http.NewServeMux() + for logType, handlerFunc := range logTypeHandlers { + pattern := fmt.Sprintf("/logTypes/%s/logs:import", logType) + mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mockServer.requests = append(mockServer.requests, string(body)) + handlerFunc(w, r) + }) + } + mockServer.srv = httptest.NewServer(mux) + return &mockServer +} + +func TestHTTPExporter(t *testing.T) { + // Override the token source so that we don't have to provide real credentials + getTokenSource = testTokenSource + defer func() { + getTokenSource = googleTokenSource + }() + + // By default, tests will apply the following changes to NewFactory.CreateDefaultConfig() + defaultCfgMod := func(cfg *Config) { + cfg.Protocol = protocolHTTPS + cfg.Location = "us" + cfg.CustomerID = "00000000-1111-2222-3333-444444444444" + cfg.Project = "fake" + cfg.Forwarder = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + cfg.LogType = "FAKE" + cfg.QueueConfig.Enabled = false + cfg.BackOffConfig.Enabled = false + } + + defaultHandlers := map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }, + } + + testCases := []struct { + name string + cfgMod func(cfg *Config) + handlers map[string]http.HandlerFunc + input plog.Logs + expectedRequests int + expectedErr string + permanentErr bool + }{ + { + name: "empty log record", + input: plog.NewLogs(), + expectedRequests: 0, + }, + { + name: "single log record", + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + }, + // TODO test splitting large payloads + { + name: "transient_error", + handlers: map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "upload to chronicle: 503 Service Unavailable", + permanentErr: false, + }, + { + name: "permanent_error", + handlers: map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + }, + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "upload to chronicle: Permanent error: 401 Unauthorized", + permanentErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a mock server so we are not dependent on the actual Chronicle service + handlers := defaultHandlers + if tc.handlers != nil { + handlers = tc.handlers + } + mockServer := newMockHTTPServer(handlers) + defer mockServer.srv.Close() + + // Override the endpoint builder so that we can point to the mock server + getHTTPEndpoint = func(_ *Config, logType string) string { + return fmt.Sprintf("%s/logTypes/%s/logs:import", mockServer.srv.URL, logType) + } + defer func() { + getHTTPEndpoint = buildHTTPEndpoint + }() + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + if tc.cfgMod != nil { + tc.cfgMod(cfg) + } else { + defaultCfgMod(cfg) + } + require.NoError(t, cfg.Validate()) + + ctx := context.Background() + exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, exp.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, exp.Shutdown(ctx)) + }() + + err = exp.ConsumeLogs(ctx, tc.input) + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err)) + } + + require.Equal(t, tc.expectedRequests, len(mockServer.requests)) + }) + } +} diff --git a/exporter/chronicleexporter/marshal.go b/exporter/chronicleexporter/marshal.go index 8243e9f6b..f40a871ec 100644 --- a/exporter/chronicleexporter/marshal.go +++ b/exporter/chronicleexporter/marshal.go @@ -48,26 +48,21 @@ var supportedLogTypes = map[string]string{ "sql_server": "MICROSOFT_SQL", } -//go:generate mockery --name logMarshaler --filename mock_log_marshaler.go --structname MockMarshaler --inpackage -type logMarshaler interface { - MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*api.BatchCreateLogsRequest, error) - MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) -} type protoMarshaler struct { - cfg Config - teleSettings component.TelemetrySettings - startTime time.Time - customerID []byte - collectorID []byte + cfg Config + set component.TelemetrySettings + startTime time.Time + customerID []byte + collectorID []byte } -func newProtoMarshaler(cfg Config, teleSettings component.TelemetrySettings, customerID []byte) (*protoMarshaler, error) { +func newProtoMarshaler(cfg Config, set component.TelemetrySettings, customerID []byte) (*protoMarshaler, error) { return &protoMarshaler{ - startTime: time.Now(), - cfg: cfg, - teleSettings: teleSettings, - customerID: customerID[:], - collectorID: chronicleCollectorID[:], + startTime: time.Now(), + cfg: cfg, + set: set, + customerID: customerID[:], + collectorID: chronicleCollectorID[:], }, nil } @@ -93,7 +88,7 @@ func (m *protoMarshaler) extractRawLogs(ctx context.Context, ld plog.Logs) (map[ rawLog, logType, namespace, ingestionLabels, err := m.processLogRecord(ctx, logRecord, scopeLog, resourceLog) if err != nil { - m.teleSettings.Logger.Error("Error processing log record", zap.Error(err)) + m.set.Logger.Error("Error processing log record", zap.Error(err)) continue } @@ -323,7 +318,7 @@ func (m *protoMarshaler) getRawField(ctx context.Context, field string, logRecor return "", nil } - lrExpr, err := expr.NewOTTLLogRecordExpression(field, m.teleSettings) + lrExpr, err := expr.NewOTTLLogRecordExpression(field, m.set) if err != nil { return "", fmt.Errorf("raw_log_field is invalid: %s", err) } @@ -431,7 +426,7 @@ func (m *protoMarshaler) enforceMaximumsGRPCRequest(request *api.BatchCreateLogs } if len(entries) < 2 { - m.teleSettings.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) + m.set.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) return []*api.BatchCreateLogsRequest{} } @@ -484,7 +479,7 @@ func (m *protoMarshaler) extractRawHTTPLogs(ctx context.Context, ld plog.Logs) ( logRecord := scopeLog.LogRecords().At(k) rawLog, logType, namespace, ingestionLabels, err := m.processHTTPLogRecord(ctx, logRecord, scopeLog, resourceLog) if err != nil { - m.teleSettings.Logger.Error("Error processing log record", zap.Error(err)) + m.set.Logger.Error("Error processing log record", zap.Error(err)) continue } @@ -542,7 +537,7 @@ func (m *protoMarshaler) enforceMaximumsHTTPRequest(request *api.ImportLogsReque } if len(logs) < 2 { - m.teleSettings.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) + m.set.Logger.Error("Single entry exceeds max request size. Dropping entry", zap.Int("size", size)) return []*api.ImportLogsRequest{} } diff --git a/exporter/chronicleexporter/marshal_test.go b/exporter/chronicleexporter/marshal_test.go index d7c219186..903f2aec9 100644 --- a/exporter/chronicleexporter/marshal_test.go +++ b/exporter/chronicleexporter/marshal_test.go @@ -1673,7 +1673,7 @@ func Test_getRawField(t *testing.T) { for _, tc := range getRawFieldCases { t.Run(tc.name, func(t *testing.T) { m := &protoMarshaler{} - m.teleSettings.Logger = zap.NewNop() + m.set.Logger = zap.NewNop() ctx := context.Background() @@ -1691,7 +1691,7 @@ func Test_getRawField(t *testing.T) { func Benchmark_getRawField(b *testing.B) { m := &protoMarshaler{} - m.teleSettings.Logger = zap.NewNop() + m.set.Logger = zap.NewNop() ctx := context.Background() diff --git a/exporter/chronicleexporter/mock_log_marshaler.go b/exporter/chronicleexporter/mock_log_marshaler.go deleted file mode 100644 index 9550d1155..000000000 --- a/exporter/chronicleexporter/mock_log_marshaler.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. - -package chronicleexporter - -import ( - context "context" - - api "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - - mock "github.com/stretchr/testify/mock" - - plog "go.opentelemetry.io/collector/pdata/plog" -) - -// MockMarshaler is an autogenerated mock type for the logMarshaler type -type MockMarshaler struct { - mock.Mock -} - -// MarshalRawLogs provides a mock function with given fields: ctx, ld -func (_m *MockMarshaler) MarshalRawLogs(ctx context.Context, ld plog.Logs) ([]*api.BatchCreateLogsRequest, error) { - ret := _m.Called(ctx, ld) - - if len(ret) == 0 { - panic("no return value specified for MarshalRawLogs") - } - - var r0 []*api.BatchCreateLogsRequest - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) ([]*api.BatchCreateLogsRequest, error)); ok { - return rf(ctx, ld) - } - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) []*api.BatchCreateLogsRequest); ok { - r0 = rf(ctx, ld) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*api.BatchCreateLogsRequest) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, plog.Logs) error); ok { - r1 = rf(ctx, ld) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MarshalRawLogsForHTTP provides a mock function with given fields: ctx, ld -func (_m *MockMarshaler) MarshalRawLogsForHTTP(ctx context.Context, ld plog.Logs) (map[string][]*api.ImportLogsRequest, error) { - ret := _m.Called(ctx, ld) - - if len(ret) == 0 { - panic("no return value specified for MarshalRawLogsForHTTP") - } - - var r0 map[string][]*api.ImportLogsRequest - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) (map[string][]*api.ImportLogsRequest, error)); ok { - return rf(ctx, ld) - } - if rf, ok := ret.Get(0).(func(context.Context, plog.Logs) map[string][]*api.ImportLogsRequest); ok { - r0 = rf(ctx, ld) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string][]*api.ImportLogsRequest) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, plog.Logs) error); ok { - r1 = rf(ctx, ld) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// NewMockMarshaler creates a new instance of MockMarshaler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockMarshaler(t interface { - mock.TestingT - Cleanup(func()) -}) *MockMarshaler { - mock := &MockMarshaler{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/exporter/chronicleexporter/protos/api/mocks/mock_service.go b/exporter/chronicleexporter/protos/api/mocks/mock_service.go deleted file mode 100644 index 7a2999353..000000000 --- a/exporter/chronicleexporter/protos/api/mocks/mock_service.go +++ /dev/null @@ -1,177 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: ./exporter/chronicleexporter/protos/generated/ingestion_grpc.pb.go - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - generated "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" - grpc "google.golang.org/grpc" -) - -// MockIngestionServiceV2Client is a mock of IngestionServiceV2Client interface. -type MockIngestionServiceV2Client struct { - ctrl *gomock.Controller - recorder *MockIngestionServiceV2ClientMockRecorder -} - -// MockIngestionServiceV2ClientMockRecorder is the mock recorder for MockIngestionServiceV2Client. -type MockIngestionServiceV2ClientMockRecorder struct { - mock *MockIngestionServiceV2Client -} - -// NewMockIngestionServiceV2Client creates a new mock instance. -func NewMockIngestionServiceV2Client(ctrl *gomock.Controller) *MockIngestionServiceV2Client { - mock := &MockIngestionServiceV2Client{ctrl: ctrl} - mock.recorder = &MockIngestionServiceV2ClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockIngestionServiceV2Client) EXPECT() *MockIngestionServiceV2ClientMockRecorder { - return m.recorder -} - -// BatchCreateEvents mocks base method. -func (m *MockIngestionServiceV2Client) BatchCreateEvents(ctx context.Context, in *generated.BatchCreateEventsRequest, opts ...grpc.CallOption) (*generated.BatchCreateEventsResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{ctx, in} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "BatchCreateEvents", varargs...) - ret0, _ := ret[0].(*generated.BatchCreateEventsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchCreateEvents indicates an expected call of BatchCreateEvents. -func (mr *MockIngestionServiceV2ClientMockRecorder) BatchCreateEvents(ctx, in interface{}, opts ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, in}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCreateEvents", reflect.TypeOf((*MockIngestionServiceV2Client)(nil).BatchCreateEvents), varargs...) -} - -// BatchCreateLogs mocks base method. -func (m *MockIngestionServiceV2Client) BatchCreateLogs(ctx context.Context, in *generated.BatchCreateLogsRequest, opts ...grpc.CallOption) (*generated.BatchCreateLogsResponse, error) { - m.ctrl.T.Helper() - varargs := []interface{}{ctx, in} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "BatchCreateLogs", varargs...) - ret0, _ := ret[0].(*generated.BatchCreateLogsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchCreateLogs indicates an expected call of BatchCreateLogs. -func (mr *MockIngestionServiceV2ClientMockRecorder) BatchCreateLogs(ctx, in interface{}, opts ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx, in}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCreateLogs", reflect.TypeOf((*MockIngestionServiceV2Client)(nil).BatchCreateLogs), varargs...) -} - -// MockIngestionServiceV2Server is a mock of IngestionServiceV2Server interface. -type MockIngestionServiceV2Server struct { - ctrl *gomock.Controller - recorder *MockIngestionServiceV2ServerMockRecorder -} - -// MockIngestionServiceV2ServerMockRecorder is the mock recorder for MockIngestionServiceV2Server. -type MockIngestionServiceV2ServerMockRecorder struct { - mock *MockIngestionServiceV2Server -} - -// NewMockIngestionServiceV2Server creates a new mock instance. -func NewMockIngestionServiceV2Server(ctrl *gomock.Controller) *MockIngestionServiceV2Server { - mock := &MockIngestionServiceV2Server{ctrl: ctrl} - mock.recorder = &MockIngestionServiceV2ServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockIngestionServiceV2Server) EXPECT() *MockIngestionServiceV2ServerMockRecorder { - return m.recorder -} - -// BatchCreateEvents mocks base method. -func (m *MockIngestionServiceV2Server) BatchCreateEvents(arg0 context.Context, arg1 *generated.BatchCreateEventsRequest) (*generated.BatchCreateEventsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BatchCreateEvents", arg0, arg1) - ret0, _ := ret[0].(*generated.BatchCreateEventsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchCreateEvents indicates an expected call of BatchCreateEvents. -func (mr *MockIngestionServiceV2ServerMockRecorder) BatchCreateEvents(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCreateEvents", reflect.TypeOf((*MockIngestionServiceV2Server)(nil).BatchCreateEvents), arg0, arg1) -} - -// BatchCreateLogs mocks base method. -func (m *MockIngestionServiceV2Server) BatchCreateLogs(arg0 context.Context, arg1 *generated.BatchCreateLogsRequest) (*generated.BatchCreateLogsResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BatchCreateLogs", arg0, arg1) - ret0, _ := ret[0].(*generated.BatchCreateLogsResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchCreateLogs indicates an expected call of BatchCreateLogs. -func (mr *MockIngestionServiceV2ServerMockRecorder) BatchCreateLogs(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchCreateLogs", reflect.TypeOf((*MockIngestionServiceV2Server)(nil).BatchCreateLogs), arg0, arg1) -} - -// mustEmbedUnimplementedIngestionServiceV2Server mocks base method. -func (m *MockIngestionServiceV2Server) mustEmbedUnimplementedIngestionServiceV2Server() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "mustEmbedUnimplementedIngestionServiceV2Server") -} - -// mustEmbedUnimplementedIngestionServiceV2Server indicates an expected call of mustEmbedUnimplementedIngestionServiceV2Server. -func (mr *MockIngestionServiceV2ServerMockRecorder) mustEmbedUnimplementedIngestionServiceV2Server() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedIngestionServiceV2Server", reflect.TypeOf((*MockIngestionServiceV2Server)(nil).mustEmbedUnimplementedIngestionServiceV2Server)) -} - -// MockUnsafeIngestionServiceV2Server is a mock of UnsafeIngestionServiceV2Server interface. -type MockUnsafeIngestionServiceV2Server struct { - ctrl *gomock.Controller - recorder *MockUnsafeIngestionServiceV2ServerMockRecorder -} - -// MockUnsafeIngestionServiceV2ServerMockRecorder is the mock recorder for MockUnsafeIngestionServiceV2Server. -type MockUnsafeIngestionServiceV2ServerMockRecorder struct { - mock *MockUnsafeIngestionServiceV2Server -} - -// NewMockUnsafeIngestionServiceV2Server creates a new mock instance. -func NewMockUnsafeIngestionServiceV2Server(ctrl *gomock.Controller) *MockUnsafeIngestionServiceV2Server { - mock := &MockUnsafeIngestionServiceV2Server{ctrl: ctrl} - mock.recorder = &MockUnsafeIngestionServiceV2ServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockUnsafeIngestionServiceV2Server) EXPECT() *MockUnsafeIngestionServiceV2ServerMockRecorder { - return m.recorder -} - -// mustEmbedUnimplementedIngestionServiceV2Server mocks base method. -func (m *MockUnsafeIngestionServiceV2Server) mustEmbedUnimplementedIngestionServiceV2Server() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "mustEmbedUnimplementedIngestionServiceV2Server") -} - -// mustEmbedUnimplementedIngestionServiceV2Server indicates an expected call of mustEmbedUnimplementedIngestionServiceV2Server. -func (mr *MockUnsafeIngestionServiceV2ServerMockRecorder) mustEmbedUnimplementedIngestionServiceV2Server() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedIngestionServiceV2Server", reflect.TypeOf((*MockUnsafeIngestionServiceV2Server)(nil).mustEmbedUnimplementedIngestionServiceV2Server)) -} diff --git a/exporter/chronicleexporter/util.go b/exporter/chronicleexporter/util.go new file mode 100644 index 000000000..02c212d60 --- /dev/null +++ b/exporter/chronicleexporter/util.go @@ -0,0 +1,48 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "errors" + "fmt" + "os" + + "golang.org/x/oauth2/google" +) + +func googleCredentials(ctx context.Context, cfg *Config) (*google.Credentials, error) { + scope := grpcScope + if cfg.Protocol == protocolHTTPS { + scope = httpScope + } + switch { + case cfg.Creds != "": + return google.CredentialsFromJSON(ctx, []byte(cfg.Creds), scope) + case cfg.CredsFilePath != "": + credsData, err := os.ReadFile(cfg.CredsFilePath) + if err != nil { + return nil, fmt.Errorf("read credentials file: %w", err) + } + + if len(credsData) == 0 { + return nil, errors.New("credentials file is empty") + } + + return google.CredentialsFromJSON(ctx, credsData, scope) + default: + return google.FindDefaultCredentials(ctx, scope) + } +} diff --git a/exporter/chronicleexporter/util_test.go b/exporter/chronicleexporter/util_test.go new file mode 100644 index 000000000..73d7dd813 --- /dev/null +++ b/exporter/chronicleexporter/util_test.go @@ -0,0 +1,31 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + + "golang.org/x/oauth2" +) + +func testTokenSource(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { + return &emptyTokenSource{}, nil +} + +type emptyTokenSource struct{} + +func (t *emptyTokenSource) Token() (*oauth2.Token, error) { + return &oauth2.Token{}, nil +}