Skip to content

Commit

Permalink
Add TelemetryCounter.Stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
shazlehu committed Nov 20, 2024
1 parent 73787ab commit 62d4482
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
17 changes: 15 additions & 2 deletions counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,38 @@ import "encoding/json"
type TelemetryCounter struct {
resources map[string]*ResourceCounter
commands chan func()
done chan struct{}
}

// NewTelemetryCounter creates a new TelemetryCounter.
func NewTelemetryCounter() *TelemetryCounter {
t := &TelemetryCounter{
resources: make(map[string]*ResourceCounter),
commands: make(chan func()),
done: make(chan struct{}),
}
go t.run()
return t
}

// run listens for commands to modify or read the resources.
func (t *TelemetryCounter) run() {
for cmd := range t.commands {
cmd()
for {
select {
case cmd := <-t.commands:
cmd() // Execute the command
case <-t.done:
// Shutdown signal received, exit the loop
return
}
}
}

// Stop gracefully shuts down the TelemetryCounter.
func (t *TelemetryCounter) Stop() {
close(t.done)
}

// Add increments the counter with the supplied dimensions.
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
t.commands <- func() {
Expand Down
1 change: 1 addition & 0 deletions processor/datapointcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (p *metricCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions processor/logcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (p *logCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions processor/spancountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (p *spanCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

Expand Down

0 comments on commit 62d4482

Please sign in to comment.