diff --git a/counter/counter.go b/counter/counter.go index 902e161c7..963efc006 100644 --- a/counter/counter.go +++ b/counter/counter.go @@ -20,33 +20,45 @@ import "encoding/json" // TelemetryCounter tracks the number of times a set of resource and attribute dimensions have been seen. type TelemetryCounter struct { resources map[string]*ResourceCounter + commands chan func() } // NewTelemetryCounter creates a new TelemetryCounter. func NewTelemetryCounter() *TelemetryCounter { - return &TelemetryCounter{ + t := &TelemetryCounter{ resources: make(map[string]*ResourceCounter), + commands: make(chan func()), } + go t.run() + return t } -// Add increments the counter with the supplied dimensions. -func (t *TelemetryCounter) Add(resource, attributes map[string]any) { - key := getDimensionKey(resource) - if _, ok := t.resources[key]; !ok { - t.resources[key] = NewResourceCounter(resource) +// run listens for commands to modify or read the resources. +func (t *TelemetryCounter) run() { + for cmd := range t.commands { + cmd() } - - t.resources[key].Add(attributes) } -// Resources returns a map of resource ID to a counter for that resource. -func (t TelemetryCounter) Resources() map[string]*ResourceCounter { - return t.resources +// Add increments the counter with the supplied dimensions. +func (t *TelemetryCounter) Add(resource, attributes map[string]any) { + t.commands <- func() { + key := getDimensionKey(resource) + if _, ok := t.resources[key]; !ok { + t.resources[key] = newResourceCounter(resource) + } + t.resources[key].add(attributes) + } } -// Reset resets the counter. -func (t *TelemetryCounter) Reset() { - t.resources = make(map[string]*ResourceCounter) +// Resources returns a map of resource ID to a counter for that resource and resets the counter. +func (t *TelemetryCounter) Resources() map[string]*ResourceCounter { + result := make(chan map[string]*ResourceCounter) + t.commands <- func() { + result <- t.resources + t.resources = make(map[string]*ResourceCounter) // Reset the counter + } + return <-result } // ResourceCounter dimensions the counter by resource. @@ -55,22 +67,22 @@ type ResourceCounter struct { attributes map[string]*AttributeCounter } -// NewResourceCounter creates a new ResourceCounter. -func NewResourceCounter(values map[string]any) *ResourceCounter { +// newResourceCounter creates a new ResourceCounter. +func newResourceCounter(values map[string]any) *ResourceCounter { return &ResourceCounter{ values: values, attributes: map[string]*AttributeCounter{}, } } -// Add increments the counter with the supplied dimensions. -func (r *ResourceCounter) Add(attributes map[string]any) { +// add increments the counter with the supplied dimensions. +func (r *ResourceCounter) add(attributes map[string]any) { key := getDimensionKey(attributes) if _, ok := r.attributes[key]; !ok { - r.attributes[key] = NewAttributeCounter(attributes) + r.attributes[key] = newAttributeCounter(attributes) } - r.attributes[key].Add() + r.attributes[key].add() } // Attributes returns a map of attribute set ID to a counter for that attribute set. @@ -89,15 +101,15 @@ type AttributeCounter struct { count int } -// NewAttributeCounter creates a new AttributeCounter. -func NewAttributeCounter(values map[string]any) *AttributeCounter { +// newAttributeCounter creates a new AttributeCounter. +func newAttributeCounter(values map[string]any) *AttributeCounter { return &AttributeCounter{ values: values, } } -// Add increments the counter. -func (a *AttributeCounter) Add() { +// add increments the counter. +func (a *AttributeCounter) add() { a.count++ } diff --git a/counter/counter_test.go b/counter/counter_test.go index 3b2b8c71c..f9c45fa2e 100644 --- a/counter/counter_test.go +++ b/counter/counter_test.go @@ -42,10 +42,13 @@ func TestLogCounter(t *testing.T) { attrKey1 := getDimensionKey(attrMap1) attrKey2 := getDimensionKey(attrMap2) - require.Equal(t, 10, counter.resources[resourceKey1].attributes[attrKey1].count) - require.Equal(t, 5, counter.resources[resourceKey1].attributes[attrKey2].count) - require.Equal(t, 1, counter.resources[resourceKey2].attributes[attrKey1].count) + resources := counter.Resources() - counter.Reset() - require.Len(t, counter.resources, 0) + require.Equal(t, 10, resources[resourceKey1].attributes[attrKey1].count) + require.Equal(t, 5, resources[resourceKey1].attributes[attrKey2].count) + require.Equal(t, 1, resources[resourceKey2].attributes[attrKey1].count) + + // Ensure that the counter has been reset + resources = counter.Resources() + require.Len(t, resources, 0) } diff --git a/processor/datapointcountprocessor/processor.go b/processor/datapointcountprocessor/processor.go index f8b0fda75..e0611b6cd 100644 --- a/processor/datapointcountprocessor/processor.go +++ b/processor/datapointcountprocessor/processor.go @@ -42,7 +42,6 @@ type metricCountProcessor struct { logger *zap.Logger cancel context.CancelFunc wg sync.WaitGroup - mux sync.Mutex } // newExprProcessor returns a new processor with expr expressions. @@ -110,8 +109,6 @@ func (p *metricCountProcessor) Shutdown(_ context.Context) error { // ConsumeMetrics processes the metrics. func (p *metricCountProcessor) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error { - p.mux.Lock() - defer p.mux.Unlock() if p.isOTTL() { p.consumeMetricsOTTL(ctx, m) @@ -202,9 +199,6 @@ func (p *metricCountProcessor) sendMetrics(ctx context.Context) { // createMetrics creates metrics from the counter. The counter is reset after the metrics are created. func (p *metricCountProcessor) createMetrics() pmetric.Metrics { - p.mux.Lock() - defer p.mux.Unlock() - metrics := pmetric.NewMetrics() for _, resource := range p.counter.Resources() { resourceMetrics := metrics.ResourceMetrics().AppendEmpty() @@ -231,9 +225,6 @@ func (p *metricCountProcessor) createMetrics() pmetric.Metrics { } } - - p.counter.Reset() - return metrics } diff --git a/processor/logcountprocessor/processor.go b/processor/logcountprocessor/processor.go index e1fd7bc00..203c173a4 100644 --- a/processor/logcountprocessor/processor.go +++ b/processor/logcountprocessor/processor.go @@ -43,7 +43,6 @@ type logCountProcessor struct { logger *zap.Logger cancel context.CancelFunc wg sync.WaitGroup - mux sync.Mutex } // newProcessor returns a new processor. @@ -105,8 +104,6 @@ func (p *logCountProcessor) Shutdown(_ context.Context) error { // ConsumeLogs processes the logs. func (p *logCountProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error { - p.mux.Lock() - defer p.mux.Unlock() if p.isOTTL() { p.consumeLogsOTTL(ctx, pl) @@ -176,16 +173,12 @@ func (p *logCountProcessor) handleMetricInterval(ctx context.Context) { // sendMetrics sends metrics to the consumer. func (p *logCountProcessor) sendMetrics(ctx context.Context) { - p.mux.Lock() - defer p.mux.Unlock() metrics := p.createMetrics() if metrics.ResourceMetrics().Len() == 0 { return } - p.counter.Reset() - if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil { p.logger.Error("Failed to send metrics", zap.Error(err)) } diff --git a/processor/spancountprocessor/processor.go b/processor/spancountprocessor/processor.go index d1ca72d3c..89881e776 100644 --- a/processor/spancountprocessor/processor.go +++ b/processor/spancountprocessor/processor.go @@ -43,7 +43,6 @@ type spanCountProcessor struct { logger *zap.Logger cancel context.CancelFunc wg sync.WaitGroup - mux sync.Mutex } // newProcessor returns a new processor. @@ -106,8 +105,6 @@ func (p *spanCountProcessor) Shutdown(_ context.Context) error { // ConsumeMetrics processes the metrics. func (p *spanCountProcessor) ConsumeTraces(ctx context.Context, t ptrace.Traces) error { - p.mux.Lock() - defer p.mux.Unlock() if p.isOTTL() { p.consumeTracesOTTL(ctx, t) @@ -184,16 +181,12 @@ func (p *spanCountProcessor) handleMetricInterval(ctx context.Context) { // sendMetrics sends metrics to the consumer. func (p *spanCountProcessor) sendMetrics(ctx context.Context) { - p.mux.Lock() - defer p.mux.Unlock() metrics := p.createMetrics() if metrics.ResourceMetrics().Len() == 0 { return } - p.counter.Reset() - if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil { p.logger.Error("Failed to send metrics", zap.Error(err)) }