Skip to content

Commit

Permalink
chore: Minor cleanup of chronicle exporter names
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 12, 2024
1 parent 24d60e5 commit 53bd8b7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 49 deletions.
6 changes: 2 additions & 4 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ import (
const (
grpcScope = "https://www.googleapis.com/auth/malachite-ingestion"
httpScope = "https://www.googleapis.com/auth/cloud-platform"

baseEndpoint = "malachiteingestion-pa.googleapis.com"
)

type chronicleExporter struct {
Expand All @@ -63,7 +61,7 @@ type chronicleExporter struct {
grpcConn *grpc.ClientConn
wg sync.WaitGroup
cancel context.CancelFunc
metrics *exporterMetrics
metrics *hostMetricsReporter

// fields used for HTTP
httpClient *http.Client
Expand All @@ -88,7 +86,7 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID
return &chronicleExporter{
cfg: cfg,
logger: params.Logger,
metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
metrics: newHostMetricsReporter(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
marshaler: marshaller,
collectorID: collectorID,
exporterID: exporterID,
Expand Down
12 changes: 6 additions & 6 deletions exporter/chronicleexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
func TestLogsDataPusher(t *testing.T) {

// Set up configuration, logger, and context
cfg := Config{Endpoint: baseEndpoint}
cfg := Config{Endpoint: defaultEndpoint}
ctx := context.Background()

testCases := []struct {
Expand All @@ -51,7 +51,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -70,7 +70,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -91,7 +91,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -113,7 +113,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error"))
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -133,7 +133,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand Down
13 changes: 8 additions & 5 deletions exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ func NewFactory() exporter.Factory {
exporter.WithLogs(createLogsExporter, metadata.LogsStability))
}

const defaultBatchLogCountLimitGRPC = 1000
const defaultBatchRequestSizeLimitGRPC = 1048576
const defaultBatchLogCountLimitHTTP = 1000
const defaultBatchRequestSizeLimitHTTP = 1048576
const (
defaultEndpoint = "malachiteingestion-pa.googleapis.com"
defaultBatchLogCountLimitGRPC = 1000
defaultBatchRequestSizeLimitGRPC = 1048576
defaultBatchLogCountLimitHTTP = 1000
defaultBatchRequestSizeLimitHTTP = 1048576
)

// createDefaultConfig creates the default configuration for the exporter.
func createDefaultConfig() component.Config {
Expand All @@ -48,9 +51,9 @@ func createDefaultConfig() component.Config {
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: baseEndpoint,
Compression: noCompression,
CollectAgentMetrics: true,
Endpoint: defaultEndpoint,
BatchLogCountLimitGRPC: defaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: defaultBatchRequestSizeLimitGRPC,
BatchLogCountLimitHTTP: defaultBatchLogCountLimitHTTP,
Expand Down
68 changes: 34 additions & 34 deletions exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type exporterMetrics struct {
type hostMetricsReporter struct {
mutex sync.Mutex
agentID []byte
customerID []byte
Expand All @@ -38,9 +38,9 @@ type exporterMetrics struct {
logsSent int64
}

func newExporterMetrics(agentID, customerID []byte, exporterID, namespace string) *exporterMetrics {
func newHostMetricsReporter(agentID, customerID []byte, exporterID, namespace string) *hostMetricsReporter {
now := timestamppb.Now()
return &exporterMetrics{
return &hostMetricsReporter{
agentID: agentID,
exporterID: exporterID,
startTime: now,
Expand All @@ -54,61 +54,61 @@ func newExporterMetrics(agentID, customerID []byte, exporterID, namespace string
}
}

func (cm *exporterMetrics) getAndReset() *api.BatchCreateEventsRequest {
cm.mutex.Lock()
defer cm.mutex.Unlock()
func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()

now := timestamppb.Now()
batchID := uuid.New()
source := &api.EventSource{
CollectorId: chronicleCollectorID[:],
Namespace: cm.namespace,
CustomerId: cm.customerID,
Namespace: hmr.namespace,
CustomerId: hmr.customerID,
}

request := &api.BatchCreateEventsRequest{
Batch: &api.EventBatch{
Id: batchID[:],
Source: source,
Type: api.EventBatch_AGENT_STATS,
StartTime: cm.startTime,
StartTime: hmr.startTime,
Events: []*api.Event{
{
Timestamp: now,
CollectionTime: now,
Source: source,
Payload: &api.Event_AgentStats{
AgentStats: cm.stats,
AgentStats: hmr.stats,
},
},
},
},
}

cm.resetStats()
hmr.resetStats()
return request
}

func (cm *exporterMetrics) resetStats() {
cm.stats = &api.AgentStatsEvent{
func (hmr *hostMetricsReporter) resetStats() {
hmr.stats = &api.AgentStatsEvent{
ExporterStats: []*api.ExporterStats{
{
Name: cm.exporterID,
AcceptedSpans: cm.logsSent,
RefusedSpans: cm.logsDropped,
Name: hmr.exporterID,
AcceptedSpans: hmr.logsSent,
RefusedSpans: hmr.logsDropped,
},
},
AgentId: cm.agentID,
StartTime: cm.startTime,
AgentId: hmr.agentID,
StartTime: hmr.startTime,
WindowStartTime: timestamppb.Now(),
}
cm.logsDropped = 0
cm.logsSent = 0
hmr.logsDropped = 0
hmr.logsSent = 0
}

func (cm *exporterMetrics) collectHostMetrics() error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
func (hmr *hostMetricsReporter) collectHostMetrics() error {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()

// Get the current process using the current PID
proc, err := process.NewProcess(int32(os.Getpid()))
Expand All @@ -124,14 +124,14 @@ func (cm *exporterMetrics) collectHostMetrics() error {
totalCPUTime := cpuTimes.User + cpuTimes.System

// convert to milliseconds
cm.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000)
hmr.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000)

// Collect memory usage (RSS)
memInfo, err := proc.MemoryInfo()
if err != nil {
return fmt.Errorf("get memory info: %w", err)
}
cm.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes
hmr.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes

// Calculate process uptime
startTimeMs, err := proc.CreateTime()
Expand All @@ -140,19 +140,19 @@ func (cm *exporterMetrics) collectHostMetrics() error {
}
startTimeSec := startTimeMs / 1000
currentTimeSec := time.Now().Unix()
cm.stats.ProcessUptime = currentTimeSec - startTimeSec
hmr.stats.ProcessUptime = currentTimeSec - startTimeSec

return nil
}

func (cm *exporterMetrics) updateLastSuccessfulUpload() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.stats.LastSuccessfulUploadTime = timestamppb.Now()
func (hmr *hostMetricsReporter) updateLastSuccessfulUpload() {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.stats.LastSuccessfulUploadTime = timestamppb.Now()
}

func (cm *exporterMetrics) addSentLogs(count int64) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.logsSent += count
func (hmr *hostMetricsReporter) addSentLogs(count int64) {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.logsSent += count
}

0 comments on commit 53bd8b7

Please sign in to comment.