Skip to content

Commit

Permalink
chore: Pull metrics-specific concerns into hostMetricsReporter
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 12, 2024
1 parent e7ce981 commit 225be31
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 82 deletions.
79 changes: 25 additions & 54 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"io"
"net/http"
"os"
"sync"
"time"

"github.com/google/uuid"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
Expand All @@ -51,23 +49,21 @@ const (
)

type chronicleExporter struct {
cfg *Config
set component.TelemetrySettings
marshaler logMarshaler
collectorID, exporterID string
cfg *Config
set component.TelemetrySettings
marshaler logMarshaler
exporterID string

// fields used for gRPC
grpcClient api.IngestionServiceV2Client
grpcConn *grpc.ClientConn
wg sync.WaitGroup
cancel context.CancelFunc
metrics *hostMetricsReporter

// fields used for HTTP
httpClient *http.Client
}

func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID string) (*chronicleExporter, error) {
func newExporter(cfg *Config, params exporter.Settings, exporterID string) (*chronicleExporter, error) {
customerID, err := uuid.Parse(cfg.CustomerID)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
Expand All @@ -78,18 +74,11 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID
return nil, fmt.Errorf("create proto marshaller: %w", err)
}

uuidCID, err := uuid.Parse(collectorID)
if err != nil {
return nil, fmt.Errorf("parse collector ID: %w", err)
}

return &chronicleExporter{
cfg: cfg,
set: params.TelemetrySettings,
metrics: newHostMetricsReporter(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
marshaler: marshaller,
collectorID: collectorID,
exporterID: exporterID,
cfg: cfg,
set: params.TelemetrySettings,
marshaler: marshaller,
exporterID: exporterID,
}, nil
}

Expand Down Expand Up @@ -117,10 +106,16 @@ func (ce *chronicleExporter) Start(_ context.Context, _ component.Host) error {
ce.grpcClient = api.NewIngestionServiceV2Client(conn)

if ce.cfg.CollectAgentMetrics {
ctx, cancel := context.WithCancel(context.Background())
ce.cancel = cancel
ce.wg.Add(1)
go ce.startHostMetricsCollection(ctx)
f := func(ctx context.Context, request *api.BatchCreateEventsRequest) error {
_, err := ce.grpcClient.BatchCreateEvents(ctx, request)
return err
}
metrics, err := newHostMetricsReporter(ce.cfg, ce.set, ce.exporterID, f)
if err != nil {
return fmt.Errorf("create metrics reporter: %w", err)
}
ce.metrics = metrics
return ce.metrics.start()
}

return nil
Expand All @@ -135,9 +130,8 @@ func (ce *chronicleExporter) Shutdown(context.Context) error {
}
return nil
}
if ce.cancel != nil {
ce.cancel()
ce.wg.Wait()
if ce.metrics != nil {
ce.metrics.shutdown()
}
if ce.grpcConn != nil {
if err := ce.grpcConn.Close(); err != nil {
Expand Down Expand Up @@ -192,7 +186,10 @@ func (ce *chronicleExporter) logsDataPusher(ctx context.Context, ld plog.Logs) e
}

func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api.BatchCreateLogsRequest) error {
totalLogs := int64(len(request.GetBatch().GetEntries()))
if ce.metrics != nil {
totalLogs := int64(len(request.GetBatch().GetEntries()))
defer ce.metrics.recordSent(totalLogs)
}

_, err := ce.grpcClient.BatchCreateLogs(ctx, request, ce.buildOptions()...)
if err != nil {
Expand All @@ -210,8 +207,6 @@ func (ce *chronicleExporter) uploadToChronicle(ctx context.Context, request *api
}
}

ce.metrics.addSentLogs(totalLogs)
ce.metrics.updateLastSuccessfulUpload()
return nil
}

Expand All @@ -225,30 +220,6 @@ func (ce *chronicleExporter) buildOptions() []grpc.CallOption {
return opts
}

func (ce *chronicleExporter) startHostMetricsCollection(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

defer ce.wg.Done()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := ce.metrics.collectHostMetrics()
if err != nil {
ce.set.Logger.Error("Failed to collect host metrics", zap.Error(err))
}
request := ce.metrics.getAndReset()
_, err = ce.grpcClient.BatchCreateEvents(ctx, request, ce.buildOptions()...)
if err != nil {
ce.set.Logger.Error("Failed to upload host metrics", zap.Error(err))
}
}
}
}

func (ce *chronicleExporter) logsHTTPDataPusher(ctx context.Context, ld plog.Logs) error {
payloads, err := ce.marshaler.MarshalRawLogsForHTTP(ctx, ld)
if err != nil {
Expand Down
5 changes: 0 additions & 5 deletions exporter/chronicleexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
set: componenttest.NewNopTelemetrySettings(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -70,7 +69,6 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
set: componenttest.NewNopTelemetrySettings(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -91,7 +89,6 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
set: componenttest.NewNopTelemetrySettings(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -113,7 +110,6 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error"))
return &chronicleExporter{
cfg: &cfg,
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
set: componenttest.NewNopTelemetrySettings(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -133,7 +129,6 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
set: componenttest.NewNopTelemetrySettings(),
grpcClient: mockClient,
marshaler: marshaller,
Expand Down
12 changes: 1 addition & 11 deletions exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import (
"context"
"errors"

"github.com/google/uuid"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/internal/metadata"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
)

// NewFactory creates a new Chronicle exporter factory.
Expand Down Expand Up @@ -72,15 +70,7 @@ func createLogsExporter(
return nil, errors.New("invalid config type")
}

var cID string
sid, ok := params.Resource.Attributes().Get(semconv.AttributeServiceInstanceID)
if ok {
cID = sid.AsString()
} else {
cID = uuid.New().String()
}

exp, err := newExporter(chronicleCfg, params, cID, params.ID.String())
exp, err := newExporter(chronicleCfg, params, params.ID.String())
if err != nil {
return nil, err
}
Expand Down
79 changes: 67 additions & 12 deletions exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package chronicleexporter

import (
"context"
"fmt"
"os"
"sync"
Expand All @@ -23,10 +24,18 @@ import (
"github.com/google/uuid"
"github.com/observiq/bindplane-agent/exporter/chronicleexporter/protos/api"
"github.com/shirou/gopsutil/v3/process"
"go.opentelemetry.io/collector/component"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)

type hostMetricsReporter struct {
set component.TelemetrySettings
cancel context.CancelFunc
wg sync.WaitGroup
send sendMetricsFunc

mutex sync.Mutex
agentID []byte
customerID []byte
Expand All @@ -38,19 +47,63 @@ type hostMetricsReporter struct {
logsSent int64
}

func newHostMetricsReporter(agentID, customerID []byte, exporterID, namespace string) *hostMetricsReporter {
type sendMetricsFunc func(context.Context, *api.BatchCreateEventsRequest) error

func newHostMetricsReporter(cfg *Config, set component.TelemetrySettings, exporterID string, send sendMetricsFunc) (*hostMetricsReporter, error) {
customerID, err := uuid.Parse(cfg.CustomerID)
if err != nil {
return nil, fmt.Errorf("parse customer ID: %w", err)
}

agentID := uuid.New()
if sid, ok := set.Resource.Attributes().Get(semconv.AttributeServiceInstanceID); ok {
var err error
agentID, err = uuid.Parse(sid.AsString())
if err != nil {
return nil, fmt.Errorf("parse collector ID: %w", err)
}
}

now := timestamppb.Now()
return &hostMetricsReporter{
agentID: agentID,
set: set,
send: send,
agentID: agentID[:],
exporterID: exporterID,
startTime: now,
customerID: customerID,
namespace: namespace,
customerID: customerID[:],
namespace: cfg.Namespace,
stats: &api.AgentStatsEvent{
AgentId: agentID[:],
WindowStartTime: now,
AgentId: agentID,
StartTime: now,
},
}, nil
}

func (hmr *hostMetricsReporter) start() error {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

ctx, cancel := context.WithCancel(context.Background())
hmr.cancel = cancel
hmr.wg.Add(1)
defer hmr.wg.Done()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
err := hmr.collectHostMetrics()
if err != nil {
hmr.set.Logger.Error("Failed to collect host metrics", zap.Error(err))
}
request := hmr.getAndReset()
if err = hmr.send(ctx, request); err != nil {
hmr.set.Logger.Error("Failed to upload host metrics", zap.Error(err))
}
}
}
}

Expand Down Expand Up @@ -89,6 +142,13 @@ func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest {
return request
}

func (hmr *hostMetricsReporter) shutdown() {
if hmr.cancel != nil {
hmr.cancel()
hmr.wg.Wait()
}
}

func (hmr *hostMetricsReporter) resetStats() {
hmr.stats = &api.AgentStatsEvent{
ExporterStats: []*api.ExporterStats{
Expand Down Expand Up @@ -145,14 +205,9 @@ func (hmr *hostMetricsReporter) collectHostMetrics() error {
return nil
}

func (hmr *hostMetricsReporter) updateLastSuccessfulUpload() {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.stats.LastSuccessfulUploadTime = timestamppb.Now()
}

func (hmr *hostMetricsReporter) addSentLogs(count int64) {
func (hmr *hostMetricsReporter) recordSent(count int64) {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.logsSent += count
hmr.stats.LastSuccessfulUploadTime = timestamppb.Now()
}

0 comments on commit 225be31

Please sign in to comment.