diff --git a/exporter/chronicleexporter/exporter.go b/exporter/chronicleexporter/exporter.go index 2db383347..90de04f50 100644 --- a/exporter/chronicleexporter/exporter.go +++ b/exporter/chronicleexporter/exporter.go @@ -23,8 +23,6 @@ import ( "io" "net/http" "os" - "sync" - "time" "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" @@ -51,23 +49,21 @@ const ( ) type chronicleExporter struct { - cfg *Config - set component.TelemetrySettings - marshaler logMarshaler - collectorID, exporterID string + cfg *Config + set component.TelemetrySettings + marshaler logMarshaler + exporterID string // fields used for gRPC grpcClient api.IngestionServiceV2Client grpcConn *grpc.ClientConn - wg sync.WaitGroup - cancel context.CancelFunc metrics *hostMetricsReporter // fields used for HTTP httpClient *http.Client } -func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) { +func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chronicleExporter, error) { customerID, err := uuid.Parse(cfg.CustomerID) if err != nil { return nil, fmt.Errorf("parse customer ID: %w", err) @@ -78,18 +74,11 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID return nil, fmt.Errorf("create proto marshaller: %w", err) } - uuidCID, err := uuid.Parse(collectorID) - if err != nil { - return nil, fmt.Errorf("parse collector ID: %w", err) - } - return &chronicleExporter{ - cfg: cfg, - set: params.TelemetrySettings, - metrics: newHostMetricsReporter(uuidCID[:], customerID[:], exporterID, cfg.Namespace), - marshaler: marshaller, - collectorID: collectorID, - exporterID: exporterID, + cfg: cfg, + set: params.TelemetrySettings, + marshaler: marshaller, + exporterID: exporterID, }, nil } @@ -117,10 +106,16 @@ func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error { ce.grpcClient = api.NewIngestionServiceV2Client(conn) if ce.cfg.CollectAgentMetrics { - ctx, cancel := context.WithCancel(context.Background()) - ce.cancel = cancel - ce.wg.Add(1) - go ce.startHostMetricsCollection(ctx) + f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error { + _, err := ce.grpcClient.BatchCreateEvents(ctx, request) + return err + } + metrics, err := newHostMetricsReporter(ce.cfg, ce.set, ce.exporterID, f) + if err != nil { + return fmt.Errorf("create metrics reporter: %w", err) + } + ce.metrics = metrics + ce.metrics.start() } return nil @@ -135,9 +130,8 @@ func (ce *chronicleExporter) Shutdown(context.Context) error { } return nil } - if ce.cancel != nil { - ce.cancel() - ce.wg.Wait() + if ce.metrics != nil { + ce.metrics.shutdown() } if ce.grpcConn != nil { if err := ce.grpcConn.Close(); err != nil { @@ -192,7 +186,10 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e } func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error { - totalLogs := int64(len(request.GetBatch().GetEntries())) + if ce.metrics != nil { + totalLogs := int64(len(request.GetBatch().GetEntries())) + defer ce.metrics.recordSent(totalLogs) + } _, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...) if err != nil { @@ -210,8 +207,6 @@ func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api } } - ce.metrics.addSentLogs(totalLogs) - ce.metrics.updateLastSuccessfulUpload() return nil } @@ -225,30 +220,6 @@ func (ce *chronicleExporter) buildOptions() []grpc.CallOption { return opts } -func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) { - ticker := time.NewTicker(5 * time.Minute) - defer ticker.Stop() - - defer ce.wg.Done() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - err := ce.metrics.collectHostMetrics() - if err != nil { - ce.set.Logger.Error("Failed to collect host metrics", zap.Error(err)) - } - request := ce.metrics.getAndReset() - _, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...) - if err != nil { - ce.set.Logger.Error("Failed to upload host metrics", zap.Error(err)) - } - } - } -} - func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error { payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld) if err != nil { diff --git a/exporter/chronicleexporter/exporter_test.go b/exporter/chronicleexporter/exporter_test.go index 44f0d49b0..25e680872 100644 --- a/exporter/chronicleexporter/exporter_test.go +++ b/exporter/chronicleexporter/exporter_test.go @@ -51,7 +51,25 @@ func TestLogsDataPusher(t *testing.T) { marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) return &chronicleExporter{ cfg: &cfg, - metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace), + set: componenttest.NewNopTelemetrySettings(), + grpcClient: mockClient, + marshaler: marshaller, + } + }, + setupMocks: func(mockClient *mocks.MockIngestionServiceV2Client) { + mockClient.EXPECT().BatchCreateLogs(gomock.Any(), gomock.Any(), gomock.Any()).Return(&api.BatchCreateLogsResponse{}, nil) + }, + expectedErr: "", + }, + { + desc: "successful push to Chronicle with metrics", + setupExporter: func() *chronicleExporter { + mockClient := mocks.NewMockIngestionServiceV2Client(gomock.NewController(t)) + marshaller := NewMockMarshaler(t) + marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) + cfg.CollectAgentMetrics = true + return &chronicleExporter{ + cfg: &cfg, set: componenttest.NewNopTelemetrySettings(), grpcClient: mockClient, marshaler: marshaller, @@ -70,7 +88,6 @@ func TestLogsDataPusher(t *testing.T) { marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) return &chronicleExporter{ cfg: &cfg, - metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace), set: componenttest.NewNopTelemetrySettings(), grpcClient: mockClient, marshaler: marshaller, @@ -91,7 +108,6 @@ func TestLogsDataPusher(t *testing.T) { marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil) return &chronicleExporter{ cfg: &cfg, - metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace), set: componenttest.NewNopTelemetrySettings(), grpcClient: mockClient, marshaler: marshaller, @@ -113,7 +129,6 @@ func TestLogsDataPusher(t *testing.T) { marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error")) return &chronicleExporter{ cfg: &cfg, - metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace), set: componenttest.NewNopTelemetrySettings(), grpcClient: mockClient, marshaler: marshaller, @@ -133,7 +148,6 @@ func TestLogsDataPusher(t *testing.T) { marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil) return &chronicleExporter{ cfg: &cfg, - metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace), set: componenttest.NewNopTelemetrySettings(), grpcClient: mockClient, marshaler: marshaller, diff --git a/exporter/chronicleexporter/factory.go b/exporter/chronicleexporter/factory.go index b50f1d45c..b7a058f2f 100644 --- a/exporter/chronicleexporter/factory.go +++ b/exporter/chronicleexporter/factory.go @@ -18,13 +18,11 @@ import ( "context" "errors" - "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - semconv "go.opentelemetry.io/collector/semconv/v1.5.0" ) // NewFactory creates a new Chronicle exporter factory. @@ -72,15 +70,7 @@ func createLogsExporter( return nil, errors.New("invalid config type") } - var cID string - sid, ok := params.Resource.Attributes().Get(semconv.AttributeServiceInstanceID) - if ok { - cID = sid.AsString() - } else { - cID = uuid.New().String() - } - - exp, err := newExporter(chronicleCfg, params, cID, params.ID.String()) + exp, err := newExporter(chronicleCfg, params, params.ID.String()) if err != nil { return nil, err } diff --git a/exporter/chronicleexporter/hostmetrics.go b/exporter/chronicleexporter/hostmetrics.go index 40c3ed48b..a619087df 100644 --- a/exporter/chronicleexporter/hostmetrics.go +++ b/exporter/chronicleexporter/hostmetrics.go @@ -15,6 +15,7 @@ package chronicleexporter import ( + "context" "fmt" "os" "sync" @@ -23,10 +24,18 @@ import ( "github.com/google/uuid" "github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api" "github.com/shirou/gopsutil/v3/process" + "go.opentelemetry.io/collector/component" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + "go.uber.org/zap" "google.golang.org/protobuf/types/known/timestamppb" ) type hostMetricsReporter struct { + set component.TelemetrySettings + cancel context.CancelFunc + wg sync.WaitGroup + send sendMetricsFunc + mutex sync.Mutex agentID []byte customerID []byte @@ -38,20 +47,67 @@ type hostMetricsReporter struct { logsSent int64 } -func newHostMetricsReporter(agentID, customerID []byte, exporterID, namespace string) *hostMetricsReporter { +type sendMetricsFunc func(context.Context, *api.BatchCreateEventsRequest) error + +func newHostMetricsReporter(cfg *Config, set component.TelemetrySettings, exporterID string, send sendMetricsFunc) (*hostMetricsReporter, error) { + customerID, err := uuid.Parse(cfg.CustomerID) + if err != nil { + return nil, fmt.Errorf("parse customer ID: %w", err) + } + + agentID := uuid.New() + if sid, ok := set.Resource.Attributes().Get(semconv.AttributeServiceInstanceID); ok { + var err error + agentID, err = uuid.Parse(sid.AsString()) + if err != nil { + return nil, fmt.Errorf("parse collector ID: %w", err) + } + } + now := timestamppb.Now() return &hostMetricsReporter{ - agentID: agentID, + set: set, + send: send, + agentID: agentID[:], exporterID: exporterID, startTime: now, - customerID: customerID, - namespace: namespace, + customerID: customerID[:], + namespace: cfg.Namespace, stats: &api.AgentStatsEvent{ + AgentId: agentID[:], WindowStartTime: now, - AgentId: agentID, StartTime: now, }, - } + }, nil +} + +func (hmr *hostMetricsReporter) start() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + hmr.cancel = cancel + hmr.wg.Add(1) + + go func() { + defer hmr.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := hmr.collectHostMetrics() + if err != nil { + hmr.set.Logger.Error("Failed to collect host metrics", zap.Error(err)) + } + request := hmr.getAndReset() + if err = hmr.send(ctx, request); err != nil { + hmr.set.Logger.Error("Failed to upload host metrics", zap.Error(err)) + } + } + } + }() } func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest { @@ -89,6 +145,13 @@ func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest { return request } +func (hmr *hostMetricsReporter) shutdown() { + if hmr.cancel != nil { + hmr.cancel() + hmr.wg.Wait() + } +} + func (hmr *hostMetricsReporter) resetStats() { hmr.stats = &api.AgentStatsEvent{ ExporterStats: []*api.ExporterStats{ @@ -145,14 +208,9 @@ func (hmr *hostMetricsReporter) collectHostMetrics() error { return nil } -func (hmr *hostMetricsReporter) updateLastSuccessfulUpload() { - hmr.mutex.Lock() - defer hmr.mutex.Unlock() - hmr.stats.LastSuccessfulUploadTime = timestamppb.Now() -} - -func (hmr *hostMetricsReporter) addSentLogs(count int64) { +func (hmr *hostMetricsReporter) recordSent(count int64) { hmr.mutex.Lock() defer hmr.mutex.Unlock() hmr.logsSent += count + hmr.stats.LastSuccessfulUploadTime = timestamppb.Now() }