Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Minor cleanup of chronicle exporter names #2046

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions exporter/chronicleexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ import (
const (
grpcScope = "https://www.googleapis.com/auth/malachite-ingestion"
httpScope = "https://www.googleapis.com/auth/cloud-platform"

baseEndpoint = "malachiteingestion-pa.googleapis.com"
)

type chronicleExporter struct {
Expand All @@ -63,7 +61,7 @@ type chronicleExporter struct {
grpcConn *grpc.ClientConn
wg sync.WaitGroup
cancel context.CancelFunc
metrics *exporterMetrics
metrics *hostMetricsReporter

// fields used for HTTP
httpClient *http.Client
Expand All @@ -88,7 +86,7 @@ func newExporter(cfg *Config, params exporter.Settings, collectorID, exporterID
return &chronicleExporter{
cfg: cfg,
logger: params.Logger,
metrics: newExporterMetrics(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
metrics: newHostMetricsReporter(uuidCID[:], customerID[:], exporterID, cfg.Namespace),
marshaler: marshaller,
collectorID: collectorID,
exporterID: exporterID,
Expand Down
12 changes: 6 additions & 6 deletions exporter/chronicleexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
func TestLogsDataPusher(t *testing.T) {

// Set up configuration, logger, and context
cfg := Config{Endpoint: baseEndpoint}
cfg := Config{Endpoint: defaultEndpoint}
ctx := context.Background()

testCases := []struct {
Expand All @@ -51,7 +51,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -70,7 +70,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -91,7 +91,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{{}}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -113,7 +113,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return(nil, errors.New("marshal error"))
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand All @@ -133,7 +133,7 @@ func TestLogsDataPusher(t *testing.T) {
marshaller.On("MarshalRawLogs", mock.Anything, mock.Anything).Return([]*api.BatchCreateLogsRequest{}, nil)
return &chronicleExporter{
cfg: &cfg,
metrics: newExporterMetrics([]byte{}, []byte{}, "", cfg.Namespace),
metrics: newHostMetricsReporter([]byte{}, []byte{}, "", cfg.Namespace),
logger: zap.NewNop(),
grpcClient: mockClient,
marshaler: marshaller,
Expand Down
13 changes: 8 additions & 5 deletions exporter/chronicleexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ func NewFactory() exporter.Factory {
exporter.WithLogs(createLogsExporter, metadata.LogsStability))
}

const defaultBatchLogCountLimitGRPC = 1000
const defaultBatchRequestSizeLimitGRPC = 1048576
const defaultBatchLogCountLimitHTTP = 1000
const defaultBatchRequestSizeLimitHTTP = 1048576
const (
defaultEndpoint = "malachiteingestion-pa.googleapis.com"
defaultBatchLogCountLimitGRPC = 1000
defaultBatchRequestSizeLimitGRPC = 1048576
defaultBatchLogCountLimitHTTP = 1000
defaultBatchRequestSizeLimitHTTP = 1048576
)

// createDefaultConfig creates the default configuration for the exporter.
func createDefaultConfig() component.Config {
Expand All @@ -48,9 +51,9 @@ func createDefaultConfig() component.Config {
QueueConfig: exporterhelper.NewDefaultQueueConfig(),
BackOffConfig: configretry.NewDefaultBackOffConfig(),
OverrideLogType: true,
Endpoint: baseEndpoint,
Compression: noCompression,
CollectAgentMetrics: true,
Endpoint: defaultEndpoint,
BatchLogCountLimitGRPC: defaultBatchLogCountLimitGRPC,
BatchRequestSizeLimitGRPC: defaultBatchRequestSizeLimitGRPC,
BatchLogCountLimitHTTP: defaultBatchLogCountLimitHTTP,
Expand Down
68 changes: 34 additions & 34 deletions exporter/chronicleexporter/hostmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

type exporterMetrics struct {
type hostMetricsReporter struct {
mutex sync.Mutex
agentID []byte
customerID []byte
Expand All @@ -38,9 +38,9 @@ type exporterMetrics struct {
logsSent int64
}

func newExporterMetrics(agentID, customerID []byte, exporterID, namespace string) *exporterMetrics {
func newHostMetricsReporter(agentID, customerID []byte, exporterID, namespace string) *hostMetricsReporter {
now := timestamppb.Now()
return &exporterMetrics{
return &hostMetricsReporter{
agentID: agentID,
exporterID: exporterID,
startTime: now,
Expand All @@ -54,61 +54,61 @@ func newExporterMetrics(agentID, customerID []byte, exporterID, namespace string
}
}

func (cm *exporterMetrics) getAndReset() *api.BatchCreateEventsRequest {
cm.mutex.Lock()
defer cm.mutex.Unlock()
func (hmr *hostMetricsReporter) getAndReset() *api.BatchCreateEventsRequest {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()

now := timestamppb.Now()
batchID := uuid.New()
source := &api.EventSource{
CollectorId: chronicleCollectorID[:],
Namespace: cm.namespace,
CustomerId: cm.customerID,
Namespace: hmr.namespace,
CustomerId: hmr.customerID,
}

request := &api.BatchCreateEventsRequest{
Batch: &api.EventBatch{
Id: batchID[:],
Source: source,
Type: api.EventBatch_AGENT_STATS,
StartTime: cm.startTime,
StartTime: hmr.startTime,
Events: []*api.Event{
{
Timestamp: now,
CollectionTime: now,
Source: source,
Payload: &api.Event_AgentStats{
AgentStats: cm.stats,
AgentStats: hmr.stats,
},
},
},
},
}

cm.resetStats()
hmr.resetStats()
return request
}

func (cm *exporterMetrics) resetStats() {
cm.stats = &api.AgentStatsEvent{
func (hmr *hostMetricsReporter) resetStats() {
hmr.stats = &api.AgentStatsEvent{
ExporterStats: []*api.ExporterStats{
{
Name: cm.exporterID,
AcceptedSpans: cm.logsSent,
RefusedSpans: cm.logsDropped,
Name: hmr.exporterID,
AcceptedSpans: hmr.logsSent,
RefusedSpans: hmr.logsDropped,
},
},
AgentId: cm.agentID,
StartTime: cm.startTime,
AgentId: hmr.agentID,
StartTime: hmr.startTime,
WindowStartTime: timestamppb.Now(),
}
cm.logsDropped = 0
cm.logsSent = 0
hmr.logsDropped = 0
hmr.logsSent = 0
}

func (cm *exporterMetrics) collectHostMetrics() error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
func (hmr *hostMetricsReporter) collectHostMetrics() error {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()

// Get the current process using the current PID
proc, err := process.NewProcess(int32(os.Getpid()))
Expand All @@ -124,14 +124,14 @@ func (cm *exporterMetrics) collectHostMetrics() error {
totalCPUTime := cpuTimes.User + cpuTimes.System

// convert to milliseconds
cm.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000)
hmr.stats.ProcessCpuSeconds = int64(totalCPUTime * 1000)

// Collect memory usage (RSS)
memInfo, err := proc.MemoryInfo()
if err != nil {
return fmt.Errorf("get memory info: %w", err)
}
cm.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes
hmr.stats.ProcessMemoryRss = int64(memInfo.RSS / 1024) // Convert bytes to kilobytes

// Calculate process uptime
startTimeMs, err := proc.CreateTime()
Expand All @@ -140,19 +140,19 @@ func (cm *exporterMetrics) collectHostMetrics() error {
}
startTimeSec := startTimeMs / 1000
currentTimeSec := time.Now().Unix()
cm.stats.ProcessUptime = currentTimeSec - startTimeSec
hmr.stats.ProcessUptime = currentTimeSec - startTimeSec

return nil
}

func (cm *exporterMetrics) updateLastSuccessfulUpload() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.stats.LastSuccessfulUploadTime = timestamppb.Now()
func (hmr *hostMetricsReporter) updateLastSuccessfulUpload() {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.stats.LastSuccessfulUploadTime = timestamppb.Now()
}

func (cm *exporterMetrics) addSentLogs(count int64) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.logsSent += count
func (hmr *hostMetricsReporter) addSentLogs(count int64) {
hmr.mutex.Lock()
defer hmr.mutex.Unlock()
hmr.logsSent += count
}