diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go index 90de04f50..0dbf31987 100644 --- a/exporter/chronicleexporter/exporter.go +++ b/exporter/chronicleexporter/exporter.go @@ -18,11 +18,9 @@ import ( "bytes" "compress/gzip" "context" - "errors" "fmt" "io" "net/http" - "os" "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" @@ -33,7 +31,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "golang.org/x/oauth2" - "golang.org/x/oauth2/google" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -82,23 +79,19 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr }, nil } -func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error { - creds, err := loadGoogleCredentials(ce.cfg) +func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error { + ts, err := tokenSource(ctx, ce.cfg) if err != nil { return fmt.Errorf("load Google credentials: %w", err) } if ce.cfg.Protocol == protocolHTTPS { - ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource) + ce.httpClient = oauth2.NewClient(context.Background(), ts) return nil } - opts := []grpc.DialOption{ - // Apply OAuth tokens for each RPC call - grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}), - grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), - } - conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...) + endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts) + conn, err := grpc.NewClient(endpoint, dialOpts...) if err != nil { return fmt.Errorf("dial: %w", err) } @@ -145,31 +138,6 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) { - scope := grpcScope - if cfg.Protocol == protocolHTTPS { - scope = httpScope - } - - switch { - case cfg.Creds != "": - return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope) - case cfg.CredsFilePath != "": - credsData, err := os.ReadFile(cfg.CredsFilePath) - if err != nil { - return nil, fmt.Errorf("read credentials file: %w", err) - } - - if len(credsData) == 0 { - return nil, errors.New("credentials file is empty") - } - - return google.CredentialsFromJSON(context.Background(), credsData, scope) - default: - return google.FindDefaultCredentials(context.Background(), scope) - } -} - func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error { payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld) if err != nil { @@ -178,7 +146,7 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e for _, payload := range payloads { if err := ce.uploadToChronicle(ctx, payload); err != nil { - return fmt.Errorf("upload to chronicle: %w", err) + return err } } @@ -229,7 +197,7 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log for logType, logTypePayloads := range payloads { for _, payload := range logTypePayloads { if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil { - return fmt.Errorf("upload to chronicle: %w", err) + return err } } } @@ -237,14 +205,6 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log return nil } -// This uses the DataPlane URL for the request -// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import -func buildEndpoint(cfg *Config, logType string) string { - // Location Endpoint Version Project Location Instance LogType - formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" - return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType) -} - func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error { data, err := protojson.Marshal(logs) @@ -268,7 +228,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap body = bytes.NewBuffer(data) } - request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body) + request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body) if err != nil { return fmt.Errorf("create request: %w", err) } @@ -286,14 +246,38 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap defer resp.Body.Close() respBody, err := io.ReadAll(resp.Body) - if resp.StatusCode != http.StatusOK { - if err != nil { - ce.set.Logger.Warn("Failed to read response body", zap.Error(err)) - } else { - ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) - } - return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status) + if err == nil && resp.StatusCode == http.StatusOK { + return nil } - return nil + if err != nil { + ce.set.Logger.Warn("Failed to read response body", zap.Error(err)) + } else { + ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody)) + } + + // TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go + statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status) + switch resp.StatusCode { + case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient + return statusErr + default: + return consumererror.NewPermanent(statusErr) + } +} + +// Override for testing +var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) { + return cfgEndpoint + ":443", []grpc.DialOption{ + grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}), + grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")), + } +} + +// This uses the DataPlane URL for the request +// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID} +// Override for testing +var httpEndpoint = func(cfg *Config, logType string) string { + formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import" + return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType) } diff --git a/exporter/chronicleexporter/grpc_exporter_test.go b/exporter/chronicleexporter/grpc_exporter_test.go new file mode 100644 index 000000000..f95e89ade --- /dev/null +++ b/exporter/chronicleexporter/grpc_exporter_test.go @@ -0,0 +1,193 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "net" + "testing" + + "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/oauth2" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +type mockGRPCServer struct { + api.UnimplementedIngestionServiceV2Server + srv *grpc.Server + requests int + handler mockBatchCreateLogsHandler +} + +var _ api.IngestionServiceV2Server = (*mockGRPCServer)(nil) + +type mockBatchCreateLogsHandler func(*api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) + +func newMockGRPCServer(t *testing.T, handler mockBatchCreateLogsHandler) (*mockGRPCServer, string) { + mockServer := &mockGRPCServer{ + srv: grpc.NewServer(), + handler: handler, + } + ln, err := net.Listen("tcp", "localhost:") + require.NoError(t, err) + + mockServer.srv.RegisterService(&api.IngestionServiceV2_ServiceDesc, mockServer) + go func() { + require.NoError(t, mockServer.srv.Serve(ln)) + }() + return mockServer, ln.Addr().String() +} + +func (s *mockGRPCServer) BatchCreateEvents(_ context.Context, _ *api.BatchCreateEventsRequest) (*api.BatchCreateEventsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "TODO") +} +func (s *mockGRPCServer) BatchCreateLogs(_ context.Context, req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + s.requests++ + return s.handler(req) +} + +func TestGRPCExporter(t *testing.T) { + // Override the token source so that we don't have to provide real credentials + secureTokenSource := tokenSource + defer func() { + tokenSource = secureTokenSource + }() + tokenSource = func(context.Context, *Config) (oauth2.TokenSource, error) { + return &emptyTokenSource{}, nil + } + + // By default, tests will apply the following changes to NewFactory.CreateDefaultConfig() + defaultCfgMod := func(cfg *Config) { + cfg.Protocol = protocolGRPC + cfg.CustomerID = "00000000-1111-2222-3333-444444444444" + cfg.LogType = "FAKE" + cfg.QueueConfig.Enabled = false + cfg.BackOffConfig.Enabled = false + } + + testCases := []struct { + name string + handler mockBatchCreateLogsHandler + input plog.Logs + expectedRequests int + expectedErr string + permanentErr bool + }{ + { + name: "empty log record", + input: plog.NewLogs(), + expectedRequests: 0, + }, + { + name: "single log record", + handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return &api.BatchCreateLogsResponse{}, nil + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + }, + // TODO test splitting large payloads + { + name: "transient_error", + handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return nil, status.Error(codes.Unavailable, "Service Unavailable") + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "upload logs to chronicle: rpc error: code = Unavailable desc = Service Unavailable", + permanentErr: false, + }, + { + name: "permanent_error", + handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) { + return nil, status.Error(codes.Unauthenticated, "Unauthorized") + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "Permanent error: upload logs to chronicle: rpc error: code = Unauthenticated desc = Unauthorized", + permanentErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockServer, endpoint := newMockGRPCServer(t, tc.handler) + defer mockServer.srv.GracefulStop() + + // Override the client params for testing to we can connect to the mock server + secureGPPCClientParams := grpcClientParams + defer func() { + grpcClientParams = secureGPPCClientParams + }() + grpcClientParams = func(string, oauth2.TokenSource) (string, []grpc.DialOption) { + return endpoint, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} + } + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + defaultCfgMod(cfg) + cfg.Endpoint = endpoint + + require.NoError(t, cfg.Validate()) + + ctx := context.Background() + exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, exp.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, exp.Shutdown(ctx)) + }() + + err = exp.ConsumeLogs(ctx, tc.input) + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err)) + } + + require.Equal(t, tc.expectedRequests, mockServer.requests) + }) + } +} diff --git a/exporter/chronicleexporter/hostmetrics.go b/exporter/chronicleexporter/hostmetrics.go index a619087df..cfcf9e528 100644 --- a/exporter/chronicleexporter/hostmetrics.go +++ b/exporter/chronicleexporter/hostmetrics.go @@ -88,10 +88,8 @@ func (hmr *hostMetricsReporter) start() { ctx, cancel := context.WithCancel(context.Background()) hmr.cancel = cancel hmr.wg.Add(1) - go func() { defer hmr.wg.Done() - for { select { case <-ctx.Done(): diff --git a/exporter/chronicleexporter/http_exporter_test.go b/exporter/chronicleexporter/http_exporter_test.go new file mode 100644 index 000000000..f20b27445 --- /dev/null +++ b/exporter/chronicleexporter/http_exporter_test.go @@ -0,0 +1,199 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + "golang.org/x/oauth2" +) + +type mockHTTPServer struct { + srv *httptest.Server + requestCount int +} + +func newMockHTTPServer(logTypeHandlers map[string]http.HandlerFunc) *mockHTTPServer { + mockServer := mockHTTPServer{} + mux := http.NewServeMux() + for logType, handlerFunc := range logTypeHandlers { + pattern := fmt.Sprintf("/logTypes/%s/logs:import", logType) + mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { + mockServer.requestCount++ + handlerFunc(w, r) + }) + } + mockServer.srv = httptest.NewServer(mux) + return &mockServer +} + +type emptyTokenSource struct{} + +func (t *emptyTokenSource) Token() (*oauth2.Token, error) { + return &oauth2.Token{}, nil +} + +func TestHTTPExporter(t *testing.T) { + // Override the token source so that we don't have to provide real credentials + secureTokenSource := tokenSource + defer func() { + tokenSource = secureTokenSource + }() + tokenSource = func(context.Context, *Config) (oauth2.TokenSource, error) { + return &emptyTokenSource{}, nil + } + + // By default, tests will apply the following changes to NewFactory.CreateDefaultConfig() + defaultCfgMod := func(cfg *Config) { + cfg.Protocol = protocolHTTPS + cfg.Location = "us" + cfg.CustomerID = "00000000-1111-2222-3333-444444444444" + cfg.Project = "fake" + cfg.Forwarder = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + cfg.LogType = "FAKE" + cfg.QueueConfig.Enabled = false + cfg.BackOffConfig.Enabled = false + } + + defaultHandlers := map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }, + } + + testCases := []struct { + name string + cfgMod func(cfg *Config) + handlers map[string]http.HandlerFunc + input plog.Logs + expectedRequests int + expectedErr string + permanentErr bool + }{ + { + name: "empty log record", + input: plog.NewLogs(), + expectedRequests: 0, + }, + { + name: "single log record", + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + }, + // TODO test splitting large payloads + { + name: "transient_error", + handlers: map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + }, + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "upload logs to chronicle: 503 Service Unavailable", + permanentErr: false, + }, + { + name: "permanent_error", + handlers: map[string]http.HandlerFunc{ + "FAKE": func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + }, + }, + input: func() plog.Logs { + logs := plog.NewLogs() + rls := logs.ResourceLogs().AppendEmpty() + sls := rls.ScopeLogs().AppendEmpty() + lrs := sls.LogRecords().AppendEmpty() + lrs.Body().SetStr("Test") + return logs + }(), + expectedRequests: 1, + expectedErr: "Permanent error: upload logs to chronicle: 401 Unauthorized", + permanentErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create a mock server so we are not dependent on the actual Chronicle service + handlers := defaultHandlers + if tc.handlers != nil { + handlers = tc.handlers + } + mockServer := newMockHTTPServer(handlers) + defer mockServer.srv.Close() + + // Override the endpoint builder so that we can point to the mock server + secureHTTPEndpoint := httpEndpoint + defer func() { + httpEndpoint = secureHTTPEndpoint + }() + httpEndpoint = func(_ *Config, logType string) string { + return fmt.Sprintf("%s/logTypes/%s/logs:import", mockServer.srv.URL, logType) + } + + f := NewFactory() + cfg := f.CreateDefaultConfig().(*Config) + if tc.cfgMod != nil { + tc.cfgMod(cfg) + } else { + defaultCfgMod(cfg) + } + require.NoError(t, cfg.Validate()) + + ctx := context.Background() + exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg) + require.NoError(t, err) + require.NoError(t, exp.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, exp.Shutdown(ctx)) + }() + + err = exp.ConsumeLogs(ctx, tc.input) + if tc.expectedErr == "" { + require.NoError(t, err) + } else { + require.EqualError(t, err, tc.expectedErr) + require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err)) + } + + require.Equal(t, tc.expectedRequests, mockServer.requestCount) + }) + } +} diff --git a/exporter/chronicleexporter/util.go b/exporter/chronicleexporter/util.go new file mode 100644 index 000000000..21ef88e3d --- /dev/null +++ b/exporter/chronicleexporter/util.go @@ -0,0 +1,58 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chronicleexporter + +import ( + "context" + "errors" + "fmt" + "os" + + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" +) + +// Override for testing +var tokenSource = func(ctx context.Context, cfg *Config) (oauth2.TokenSource, error) { + creds, err := googleCredentials(ctx, cfg) + if err != nil { + return nil, err + } + return creds.TokenSource, nil +} + +func googleCredentials(ctx context.Context, cfg *Config) (*google.Credentials, error) { + scope := grpcScope + if cfg.Protocol == protocolHTTPS { + scope = httpScope + } + switch { + case cfg.Creds != "": + return google.CredentialsFromJSON(ctx, []byte(cfg.Creds), scope) + case cfg.CredsFilePath != "": + credsData, err := os.ReadFile(cfg.CredsFilePath) + if err != nil { + return nil, fmt.Errorf("read credentials file: %w", err) + } + + if len(credsData) == 0 { + return nil, errors.New("credentials file is empty") + } + + return google.CredentialsFromJSON(ctx, credsData, scope) + default: + return google.FindDefaultCredentials(ctx, scope) + } +}