Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add new tests for chronicle exporter with http and grpc servers #2049

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 40 additions & 56 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"

"github.com/google/uuid"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
Expand All @@ -33,7 +31,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -82,23 +79,19 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr
}, nil
}

func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error {
creds, err := loadGoogleCredentials(ce.cfg)
func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, ce.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}

if ce.cfg.Protocol == protocolHTTPS {
ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource)
ce.httpClient = oauth2.NewClient(context.Background(), ts)
return nil
}

opts := []grpc.DialOption{
// Apply OAuth tokens for each RPC call
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...)
endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts)
conn, err := grpc.NewClient(endpoint, dialOpts...)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
Expand Down Expand Up @@ -145,31 +138,6 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) {
scope := grpcScope
if cfg.Protocol == protocolHTTPS {
scope = httpScope
}

switch {
case cfg.Creds != "":
return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope)
case cfg.CredsFilePath != "":
credsData, err := os.ReadFile(cfg.CredsFilePath)
if err != nil {
return nil, fmt.Errorf("read credentials file: %w", err)
}

if len(credsData) == 0 {
return nil, errors.New("credentials file is empty")
}

return google.CredentialsFromJSON(context.Background(), credsData, scope)
default:
return google.FindDefaultCredentials(context.Background(), scope)
}
}

func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
Expand All @@ -178,7 +146,7 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e

for _, payload := range payloads {
if err := ce.uploadToChronicle(ctx, payload); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
return err
}
}

Expand Down Expand Up @@ -229,22 +197,14 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log
for logType, logTypePayloads := range payloads {
for _, payload := range logTypePayloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
return err
}
}
}

return nil
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import
func buildEndpoint(cfg *Config, logType string) string {
// Location Endpoint Version Project Location Instance LogType
formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {

data, err := protojson.Marshal(logs)
Expand All @@ -268,7 +228,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body)
request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand All @@ -286,14 +246,38 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}
return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

return nil
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}

// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go
statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status)
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient
return statusErr
default:
return consumererror.NewPermanent(statusErr)
}
}

// Override for testing
var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) {
return cfgEndpoint + ":443", []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}
// Override for testing
var httpEndpoint = func(cfg *Config, logType string) string {
formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType)
}
193 changes: 193 additions & 0 deletions exporter/chronicleexporter/grpc_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleexporter

import (
"context"
"net"
"testing"

"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"golang.org/x/oauth2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

type mockGRPCServer struct {
api.UnimplementedIngestionServiceV2Server
srv *grpc.Server
requests int
handler mockBatchCreateLogsHandler
}

var _ api.IngestionServiceV2Server = (*mockGRPCServer)(nil)

type mockBatchCreateLogsHandler func(*api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error)

func newMockGRPCServer(t *testing.T, handler mockBatchCreateLogsHandler) (*mockGRPCServer, string) {
mockServer := &mockGRPCServer{
srv: grpc.NewServer(),
handler: handler,
}
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)

mockServer.srv.RegisterService(&api.IngestionServiceV2_ServiceDesc, mockServer)
go func() {
require.NoError(t, mockServer.srv.Serve(ln))
}()
return mockServer, ln.Addr().String()
}

func (s *mockGRPCServer) BatchCreateEvents(_ context.Context, _ *api.BatchCreateEventsRequest) (*api.BatchCreateEventsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "TODO")
}
func (s *mockGRPCServer) BatchCreateLogs(_ context.Context, req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
s.requests++
return s.handler(req)
}

func TestGRPCExporter(t *testing.T) {
// Override the token source so that we don't have to provide real credentials
secureTokenSource := tokenSource
defer func() {
tokenSource = secureTokenSource
}()
tokenSource = func(context.Context, *Config) (oauth2.TokenSource, error) {
return &emptyTokenSource{}, nil
}

// By default, tests will apply the following changes to NewFactory.CreateDefaultConfig()
defaultCfgMod := func(cfg *Config) {
cfg.Protocol = protocolGRPC
cfg.CustomerID = "00000000-1111-2222-3333-444444444444"
cfg.LogType = "FAKE"
cfg.QueueConfig.Enabled = false
cfg.BackOffConfig.Enabled = false
}

testCases := []struct {
name string
handler mockBatchCreateLogsHandler
input plog.Logs
expectedRequests int
expectedErr string
permanentErr bool
}{
{
name: "empty log record",
input: plog.NewLogs(),
expectedRequests: 0,
},
{
name: "single log record",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return &api.BatchCreateLogsResponse{}, nil
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
},
// TODO test splitting large payloads
{
name: "transient_error",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return nil, status.Error(codes.Unavailable, "Service Unavailable")
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
expectedErr: "upload logs to chronicle: rpc error: code = Unavailable desc = Service Unavailable",
permanentErr: false,
},
{
name: "permanent_error",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return nil, status.Error(codes.Unauthenticated, "Unauthorized")
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
expectedErr: "Permanent error: upload logs to chronicle: rpc error: code = Unauthenticated desc = Unauthorized",
permanentErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockServer, endpoint := newMockGRPCServer(t, tc.handler)
defer mockServer.srv.GracefulStop()

// Override the client params for testing to we can connect to the mock server
secureGPPCClientParams := grpcClientParams
defer func() {
grpcClientParams = secureGPPCClientParams
}()
grpcClientParams = func(string, oauth2.TokenSource) (string, []grpc.DialOption) {
return endpoint, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
}

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
defaultCfgMod(cfg)
cfg.Endpoint = endpoint

require.NoError(t, cfg.Validate())

ctx := context.Background()
exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NoError(t, exp.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, exp.Shutdown(ctx))
}()

err = exp.ConsumeLogs(ctx, tc.input)
if tc.expectedErr == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, tc.expectedErr)
require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err))
}

require.Equal(t, tc.expectedRequests, mockServer.requests)
})
}
}
2 changes: 0 additions & 2 deletions exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ func (hmr *hostMetricsReporter) start() {
ctx, cancel := context.WithCancel(context.Background())
hmr.cancel = cancel
hmr.wg.Add(1)

go func() {
defer hmr.wg.Done()

for {
select {
case <-ctx.Done():
Expand Down
Loading