Skip to content

Commit

Permalink
Replace mutex with channel for TelemetryCounter
Browse files Browse the repository at this point in the history
  • Loading branch information
shazlehu committed Nov 20, 2024
1 parent 1829e93 commit 73787ab
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 52 deletions.
60 changes: 36 additions & 24 deletions counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,45 @@ import "encoding/json"
// TelemetryCounter tracks the number of times a set of resource and attribute dimensions have been seen.
type TelemetryCounter struct {
resources map[string]*ResourceCounter
commands chan func()
}

// NewTelemetryCounter creates a new TelemetryCounter.
func NewTelemetryCounter() *TelemetryCounter {
return &TelemetryCounter{
t := &TelemetryCounter{
resources: make(map[string]*ResourceCounter),
commands: make(chan func()),
}
go t.run()
return t
}

// Add increments the counter with the supplied dimensions.
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
key := getDimensionKey(resource)
if _, ok := t.resources[key]; !ok {
t.resources[key] = NewResourceCounter(resource)
// run listens for commands to modify or read the resources.
func (t *TelemetryCounter) run() {
for cmd := range t.commands {
cmd()
}

t.resources[key].Add(attributes)
}

// Resources returns a map of resource ID to a counter for that resource.
func (t TelemetryCounter) Resources() map[string]*ResourceCounter {
return t.resources
// Add increments the counter with the supplied dimensions.
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
t.commands <- func() {
key := getDimensionKey(resource)
if _, ok := t.resources[key]; !ok {
t.resources[key] = newResourceCounter(resource)
}
t.resources[key].add(attributes)
}
}

// Reset resets the counter.
func (t *TelemetryCounter) Reset() {
t.resources = make(map[string]*ResourceCounter)
// Resources returns a map of resource ID to a counter for that resource and resets the counter.
func (t *TelemetryCounter) Resources() map[string]*ResourceCounter {
result := make(chan map[string]*ResourceCounter)
t.commands <- func() {
result <- t.resources
t.resources = make(map[string]*ResourceCounter) // Reset the counter
}
return <-result
}

// ResourceCounter dimensions the counter by resource.
Expand All @@ -55,22 +67,22 @@ type ResourceCounter struct {
attributes map[string]*AttributeCounter
}

// NewResourceCounter creates a new ResourceCounter.
func NewResourceCounter(values map[string]any) *ResourceCounter {
// newResourceCounter creates a new ResourceCounter.
func newResourceCounter(values map[string]any) *ResourceCounter {
return &ResourceCounter{
values: values,
attributes: map[string]*AttributeCounter{},
}
}

// Add increments the counter with the supplied dimensions.
func (r *ResourceCounter) Add(attributes map[string]any) {
// add increments the counter with the supplied dimensions.
func (r *ResourceCounter) add(attributes map[string]any) {
key := getDimensionKey(attributes)
if _, ok := r.attributes[key]; !ok {
r.attributes[key] = NewAttributeCounter(attributes)
r.attributes[key] = newAttributeCounter(attributes)
}

r.attributes[key].Add()
r.attributes[key].add()
}

// Attributes returns a map of attribute set ID to a counter for that attribute set.
Expand All @@ -89,15 +101,15 @@ type AttributeCounter struct {
count int
}

// NewAttributeCounter creates a new AttributeCounter.
func NewAttributeCounter(values map[string]any) *AttributeCounter {
// newAttributeCounter creates a new AttributeCounter.
func newAttributeCounter(values map[string]any) *AttributeCounter {
return &AttributeCounter{
values: values,
}
}

// Add increments the counter.
func (a *AttributeCounter) Add() {
// add increments the counter.
func (a *AttributeCounter) add() {
a.count++
}

Expand Down
13 changes: 8 additions & 5 deletions counter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ func TestLogCounter(t *testing.T) {
attrKey1 := getDimensionKey(attrMap1)
attrKey2 := getDimensionKey(attrMap2)

require.Equal(t, 10, counter.resources[resourceKey1].attributes[attrKey1].count)
require.Equal(t, 5, counter.resources[resourceKey1].attributes[attrKey2].count)
require.Equal(t, 1, counter.resources[resourceKey2].attributes[attrKey1].count)
resources := counter.Resources()

counter.Reset()
require.Len(t, counter.resources, 0)
require.Equal(t, 10, resources[resourceKey1].attributes[attrKey1].count)
require.Equal(t, 5, resources[resourceKey1].attributes[attrKey2].count)
require.Equal(t, 1, resources[resourceKey2].attributes[attrKey1].count)

// Ensure that the counter has been reset
resources = counter.Resources()
require.Len(t, resources, 0)
}
9 changes: 0 additions & 9 deletions processor/datapointcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type metricCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newExprProcessor returns a new processor with expr expressions.
Expand Down Expand Up @@ -110,8 +109,6 @@ func (p *metricCountProcessor) Shutdown(_ context.Context) error {

// ConsumeMetrics processes the metrics.
func (p *metricCountProcessor) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeMetricsOTTL(ctx, m)
Expand Down Expand Up @@ -202,9 +199,6 @@ func (p *metricCountProcessor) sendMetrics(ctx context.Context) {

// createMetrics creates metrics from the counter. The counter is reset after the metrics are created.
func (p *metricCountProcessor) createMetrics() pmetric.Metrics {
p.mux.Lock()
defer p.mux.Unlock()

metrics := pmetric.NewMetrics()
for _, resource := range p.counter.Resources() {
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
Expand All @@ -231,9 +225,6 @@ func (p *metricCountProcessor) createMetrics() pmetric.Metrics {

}
}

p.counter.Reset()

return metrics
}

Expand Down
7 changes: 0 additions & 7 deletions processor/logcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type logCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newProcessor returns a new processor.
Expand Down Expand Up @@ -105,8 +104,6 @@ func (p *logCountProcessor) Shutdown(_ context.Context) error {

// ConsumeLogs processes the logs.
func (p *logCountProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeLogsOTTL(ctx, pl)
Expand Down Expand Up @@ -176,16 +173,12 @@ func (p *logCountProcessor) handleMetricInterval(ctx context.Context) {

// sendMetrics sends metrics to the consumer.
func (p *logCountProcessor) sendMetrics(ctx context.Context) {
p.mux.Lock()
defer p.mux.Unlock()

metrics := p.createMetrics()
if metrics.ResourceMetrics().Len() == 0 {
return
}

p.counter.Reset()

if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
p.logger.Error("Failed to send metrics", zap.Error(err))
}
Expand Down
7 changes: 0 additions & 7 deletions processor/spancountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type spanCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newProcessor returns a new processor.
Expand Down Expand Up @@ -106,8 +105,6 @@ func (p *spanCountProcessor) Shutdown(_ context.Context) error {

// ConsumeMetrics processes the metrics.
func (p *spanCountProcessor) ConsumeTraces(ctx context.Context, t ptrace.Traces) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeTracesOTTL(ctx, t)
Expand Down Expand Up @@ -184,16 +181,12 @@ func (p *spanCountProcessor) handleMetricInterval(ctx context.Context) {

// sendMetrics sends metrics to the consumer.
func (p *spanCountProcessor) sendMetrics(ctx context.Context) {
p.mux.Lock()
defer p.mux.Unlock()

metrics := p.createMetrics()
if metrics.ResourceMetrics().Len() == 0 {
return
}

p.counter.Reset()

if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
p.logger.Error("Failed to send metrics", zap.Error(err))
}
Expand Down

0 comments on commit 73787ab

Please sign in to comment.