Skip to content

Commit

Permalink
chore: Add new tests for chronicle exporter with http and grpc servers (
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Dec 16, 2024
1 parent 0bb6aaf commit 104caa3
Show file tree
Hide file tree
Showing 5 changed files with 490 additions and 58 deletions.
96 changes: 40 additions & 56 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"

"github.com/google/uuid"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
Expand All @@ -33,7 +31,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -82,23 +79,19 @@ func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chr
}, nil
}

func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error {
creds, err := loadGoogleCredentials(ce.cfg)
func (ce *chronicleExporter) Start(ctx context.Context, _ component.Host) error {
ts, err := tokenSource(ctx, ce.cfg)
if err != nil {
return fmt.Errorf("load Google credentials: %w", err)
}

if ce.cfg.Protocol == protocolHTTPS {
ce.httpClient = oauth2.NewClient(context.Background(), creds.TokenSource)
ce.httpClient = oauth2.NewClient(context.Background(), ts)
return nil
}

opts := []grpc.DialOption{
// Apply OAuth tokens for each RPC call
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: creds.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
conn, err := grpc.NewClient(ce.cfg.Endpoint+":443", opts...)
endpoint, dialOpts := grpcClientParams(ce.cfg.Endpoint, ts)
conn, err := grpc.NewClient(endpoint, dialOpts...)
if err != nil {
return fmt.Errorf("dial: %w", err)
}
Expand Down Expand Up @@ -145,31 +138,6 @@ func (ce *chronicleExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func loadGoogleCredentials(cfg *Config) (*google.Credentials, error) {
scope := grpcScope
if cfg.Protocol == protocolHTTPS {
scope = httpScope
}

switch {
case cfg.Creds != "":
return google.CredentialsFromJSON(context.Background(), []byte(cfg.Creds), scope)
case cfg.CredsFilePath != "":
credsData, err := os.ReadFile(cfg.CredsFilePath)
if err != nil {
return nil, fmt.Errorf("read credentials file: %w", err)
}

if len(credsData) == 0 {
return nil, errors.New("credentials file is empty")
}

return google.CredentialsFromJSON(context.Background(), credsData, scope)
default:
return google.FindDefaultCredentials(context.Background(), scope)
}
}

func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
Expand All @@ -178,7 +146,7 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e

for _, payload := range payloads {
if err := ce.uploadToChronicle(ctx, payload); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
return err
}
}

Expand Down Expand Up @@ -229,22 +197,14 @@ func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Log
for logType, logTypePayloads := range payloads {
for _, payload := range logTypePayloads {
if err := ce.uploadToChronicleHTTP(ctx, payload, logType); err != nil {
return fmt.Errorf("upload to chronicle: %w", err)
return err
}
}
}

return nil
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}/logTypes/{logtype}/logs:import
func buildEndpoint(cfg *Config, logType string) string {
// Location Endpoint Version Project Location Instance LogType
formatString := "https://%s-%s/%s/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, "v1alpha", cfg.Project, cfg.Location, cfg.CustomerID, logType)
}

func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *api.ImportLogsRequest, logType string) error {

data, err := protojson.Marshal(logs)
Expand All @@ -268,7 +228,7 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
body = bytes.NewBuffer(data)
}

request, err := http.NewRequestWithContext(ctx, "POST", buildEndpoint(ce.cfg, logType), body)
request, err := http.NewRequestWithContext(ctx, "POST", httpEndpoint(ce.cfg, logType), body)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
Expand All @@ -286,14 +246,38 @@ func (ce *chronicleExporter) uploadToChronicleHTTP(ctx context.Context, logs *ap
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}
return fmt.Errorf("received non-OK response from Chronicle: %s", resp.Status)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}

return nil
if err != nil {
ce.set.Logger.Warn("Failed to read response body", zap.Error(err))
} else {
ce.set.Logger.Warn("Received non-OK response from Chronicle", zap.String("status", resp.Status), zap.ByteString("response", respBody))
}

// TODO interpret with https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/errorutil/http.go
statusErr := fmt.Errorf("upload logs to chronicle: %s", resp.Status)
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable: // potentially transient
return statusErr
default:
return consumererror.NewPermanent(statusErr)
}
}

// Override for testing
var grpcClientParams = func(cfgEndpoint string, ts oauth2.TokenSource) (string, []grpc.DialOption) {
return cfgEndpoint + ":443", []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: ts}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
}

// This uses the DataPlane URL for the request
// URL for the request: https://{region}-chronicle.googleapis.com/{version}/projects/{project}/location/{region}/instances/{customerID}
// Override for testing
var httpEndpoint = func(cfg *Config, logType string) string {
formatString := "https://%s-%s/v1alpha/projects/%s/locations/%s/instances/%s/logTypes/%s/logs:import"
return fmt.Sprintf(formatString, cfg.Location, cfg.Endpoint, cfg.Project, cfg.Location, cfg.CustomerID, logType)
}
193 changes: 193 additions & 0 deletions exporter/chronicleexporter/grpc_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleexporter

import (
"context"
"net"
"testing"

"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"golang.org/x/oauth2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

type mockGRPCServer struct {
api.UnimplementedIngestionServiceV2Server
srv *grpc.Server
requests int
handler mockBatchCreateLogsHandler
}

var _ api.IngestionServiceV2Server = (*mockGRPCServer)(nil)

type mockBatchCreateLogsHandler func(*api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error)

func newMockGRPCServer(t *testing.T, handler mockBatchCreateLogsHandler) (*mockGRPCServer, string) {
mockServer := &mockGRPCServer{
srv: grpc.NewServer(),
handler: handler,
}
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)

mockServer.srv.RegisterService(&api.IngestionServiceV2_ServiceDesc, mockServer)
go func() {
require.NoError(t, mockServer.srv.Serve(ln))
}()
return mockServer, ln.Addr().String()
}

func (s *mockGRPCServer) BatchCreateEvents(_ context.Context, _ *api.BatchCreateEventsRequest) (*api.BatchCreateEventsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "TODO")
}
func (s *mockGRPCServer) BatchCreateLogs(_ context.Context, req *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
s.requests++
return s.handler(req)
}

func TestGRPCExporter(t *testing.T) {
// Override the token source so that we don't have to provide real credentials
secureTokenSource := tokenSource
defer func() {
tokenSource = secureTokenSource
}()
tokenSource = func(context.Context, *Config) (oauth2.TokenSource, error) {
return &emptyTokenSource{}, nil
}

// By default, tests will apply the following changes to NewFactory.CreateDefaultConfig()
defaultCfgMod := func(cfg *Config) {
cfg.Protocol = protocolGRPC
cfg.CustomerID = "00000000-1111-2222-3333-444444444444"
cfg.LogType = "FAKE"
cfg.QueueConfig.Enabled = false
cfg.BackOffConfig.Enabled = false
}

testCases := []struct {
name string
handler mockBatchCreateLogsHandler
input plog.Logs
expectedRequests int
expectedErr string
permanentErr bool
}{
{
name: "empty log record",
input: plog.NewLogs(),
expectedRequests: 0,
},
{
name: "single log record",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return &api.BatchCreateLogsResponse{}, nil
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
},
// TODO test splitting large payloads
{
name: "transient_error",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return nil, status.Error(codes.Unavailable, "Service Unavailable")
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
expectedErr: "upload logs to chronicle: rpc error: code = Unavailable desc = Service Unavailable",
permanentErr: false,
},
{
name: "permanent_error",
handler: func(_ *api.BatchCreateLogsRequest) (*api.BatchCreateLogsResponse, error) {
return nil, status.Error(codes.Unauthenticated, "Unauthorized")
},
input: func() plog.Logs {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
sls := rls.ScopeLogs().AppendEmpty()
lrs := sls.LogRecords().AppendEmpty()
lrs.Body().SetStr("Test")
return logs
}(),
expectedRequests: 1,
expectedErr: "Permanent error: upload logs to chronicle: rpc error: code = Unauthenticated desc = Unauthorized",
permanentErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockServer, endpoint := newMockGRPCServer(t, tc.handler)
defer mockServer.srv.GracefulStop()

// Override the client params for testing to we can connect to the mock server
secureGPPCClientParams := grpcClientParams
defer func() {
grpcClientParams = secureGPPCClientParams
}()
grpcClientParams = func(string, oauth2.TokenSource) (string, []grpc.DialOption) {
return endpoint, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
}

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
defaultCfgMod(cfg)
cfg.Endpoint = endpoint

require.NoError(t, cfg.Validate())

ctx := context.Background()
exp, err := f.CreateLogs(ctx, exportertest.NewNopSettings(), cfg)
require.NoError(t, err)
require.NoError(t, exp.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, exp.Shutdown(ctx))
}()

err = exp.ConsumeLogs(ctx, tc.input)
if tc.expectedErr == "" {
require.NoError(t, err)
} else {
require.EqualError(t, err, tc.expectedErr)
require.Equal(t, tc.permanentErr, consumererror.IsPermanent(err))
}

require.Equal(t, tc.expectedRequests, mockServer.requests)
})
}
}
2 changes: 0 additions & 2 deletions exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ func (hmr *hostMetricsReporter) start() {
ctx, cancel := context.WithCancel(context.Background())
hmr.cancel = cancel
hmr.wg.Add(1)

go func() {
defer hmr.wg.Done()

for {
select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 104caa3

Please sign in to comment.