Skip to content

Commit

Permalink
Process only changed metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
shaan420 committed Oct 25, 2023
1 parent 5cee107 commit 7653526
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 39 deletions.
86 changes: 66 additions & 20 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ type scope struct {
done chan struct{}
wg sync.WaitGroup
root bool

counterChangeNotifyCh chan *counter
gaugeChangeNotifyCh chan *gauge
changedCounters []*counter
changedGauges []*gauge
}

// ScopeOptions is a set of options to construct a scope.
Expand Down Expand Up @@ -166,23 +171,27 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
}

s := &scope{
baseReporter: baseReporter,
bucketCache: newBucketCache(),
cachedReporter: opts.CachedReporter,
counters: make(map[string]*counter),
countersSlice: make([]*counter, 0, _defaultInitialSliceSize),
defaultBuckets: opts.DefaultBuckets,
done: make(chan struct{}),
gauges: make(map[string]*gauge),
gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize),
histograms: make(map[string]*histogram),
histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize),
prefix: sanitizer.Name(opts.Prefix),
reporter: opts.Reporter,
sanitizer: sanitizer,
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
baseReporter: baseReporter,
bucketCache: newBucketCache(),
cachedReporter: opts.CachedReporter,
counters: make(map[string]*counter),
countersSlice: make([]*counter, 0, _defaultInitialSliceSize),
defaultBuckets: opts.DefaultBuckets,
done: make(chan struct{}),
gauges: make(map[string]*gauge),
gaugesSlice: make([]*gauge, 0, _defaultInitialSliceSize),
histograms: make(map[string]*histogram),
histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize),
prefix: sanitizer.Name(opts.Prefix),
reporter: opts.Reporter,
sanitizer: sanitizer,
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
counterChangeNotifyCh: make(chan *counter, 1024),
gaugeChangeNotifyCh: make(chan *gauge, 1024),
changedCounters: make([]*counter, 0, _defaultInitialSliceSize),
changedGauges: make([]*gauge, 0, _defaultInitialSliceSize),
}

// NB(r): Take a copy of the tags on creation
Expand All @@ -196,7 +205,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.reportLoop(interval)
s.processLoop(interval)
}()
}

Expand Down Expand Up @@ -281,6 +290,35 @@ func (s *scope) reportRegistry() {
}
}

func (s *scope) processLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case c := <-s.counterChangeNotifyCh:
s.changedCounters = append(s.changedCounters, c)
case g := <-s.gaugeChangeNotifyCh:
s.changedGauges = append(s.changedGauges, g)
case <-ticker.C:
s.reportChanges(s.changedCounters, s.changedGauges)
// Reset the changed counters and gauges
s.changedCounters = s.changedCounters[:0]
s.changedGauges = s.changedGauges[:0]
default:
return
}
}
}

func (s *scope) reportChanges(counters []*counter, gauges []*gauge) {
for _, c := range counters {
c.cachedReport()
}
for _, g := range gauges {
g.cachedReport()
}
}

func (s *scope) Counter(name string) Counter {
name = s.sanitizer.Name(name)
if c, ok := s.counter(name); ok {
Expand All @@ -295,14 +333,18 @@ func (s *scope) Counter(name string) Counter {
}

var cachedCounter CachedCount
var changeNotifyFn func(c *counter)
if s.cachedReporter != nil {
cachedCounter = s.cachedReporter.AllocateCounter(
s.fullyQualifiedName(name),
s.tags,
)
changeNotifyFn = func(c *counter) {
s.counterChangeNotifyCh <- c
}
}

c := newCounter(cachedCounter)
c := newCounter(cachedCounter, changeNotifyFn)
s.counters[name] = c
s.countersSlice = append(s.countersSlice, c)

Expand Down Expand Up @@ -331,13 +373,17 @@ func (s *scope) Gauge(name string) Gauge {
}

var cachedGauge CachedGauge
var changeNotifyFn func(g *gauge)
if s.cachedReporter != nil {
cachedGauge = s.cachedReporter.AllocateGauge(
s.fullyQualifiedName(name), s.tags,
)
changeNotifyFn = func(g *gauge) {
s.gaugeChangeNotifyCh <- g
}
}

g := newGauge(cachedGauge)
g := newGauge(cachedGauge, changeNotifyFn)
s.gauges[name] = g
s.gaugesSlice = append(s.gaugesSlice, g)

Expand Down
3 changes: 3 additions & 0 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
timers: make(map[string]*timer),
bucketCache: parent.bucketCache,
done: make(chan struct{}),

counterChangeNotifyCh: parent.counterChangeNotifyCh,
gaugeChangeNotifyCh: parent.gaugeChangeNotifyCh,
}
subscopeBucket.s[key] = subscope
if _, ok := r.lockedLookup(subscopeBucket, preSanitizeKey); !ok {
Expand Down
60 changes: 43 additions & 17 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,30 @@ func (c *capabilities) Tagging() bool {
}

type counter struct {
prev int64
curr int64
cachedCount CachedCount
}

func newCounter(cachedCount CachedCount) *counter {
return &counter{cachedCount: cachedCount}
prev int64
curr int64
updated uint64
cachedCount CachedCount
changeNotifyFn func(c *counter)
}

func newCounter(
cachedCount CachedCount,
changeNotifyFn func(c *counter),
) *counter {
return &counter{
cachedCount: cachedCount,
changeNotifyFn: changeNotifyFn,
}
}

func (c *counter) Inc(v int64) {
atomic.AddInt64(&c.curr, v)
if c.changeNotifyFn != nil {
if atomic.SwapUint64(&c.updated, 1) == 0 {
c.changeNotifyFn(c)
}
}
}

func (c *counter) value() int64 {
Expand Down Expand Up @@ -99,26 +112,39 @@ func (c *counter) cachedReport() {
return
}

c.cachedCount.ReportCount(delta)
if atomic.SwapUint64(&c.updated, 0) == 1 {
c.cachedCount.ReportCount(delta)
}
}

func (c *counter) snapshot() int64 {
return atomic.LoadInt64(&c.curr) - atomic.LoadInt64(&c.prev)
}

type gauge struct {
updated uint64
curr uint64
cachedGauge CachedGauge
}

func newGauge(cachedGauge CachedGauge) *gauge {
return &gauge{cachedGauge: cachedGauge}
updated uint64
curr uint64
cachedGauge CachedGauge
changeNotifyFn func(g *gauge)
}

func newGauge(
cachedGauge CachedGauge,
changeNotifyFn func(g *gauge),
) *gauge {
return &gauge{
cachedGauge: cachedGauge,
changeNotifyFn: changeNotifyFn,
}
}

func (g *gauge) Update(v float64) {
atomic.StoreUint64(&g.curr, math.Float64bits(v))
atomic.StoreUint64(&g.updated, 1)
if atomic.SwapUint64(&g.updated, 1) == 0 {
if g.changeNotifyFn != nil {
g.changeNotifyFn(g)
}
}
}

func (g *gauge) value() float64 {
Expand Down Expand Up @@ -297,7 +323,7 @@ func newHistogram(
}

for i := range h.samples {
h.samples[i].counter = newCounter(nil)
h.samples[i].counter = newCounter(nil, nil)

if cachedHistogram != nil {
switch htype {
Expand Down
4 changes: 2 additions & 2 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (r *statsTestReporter) Capabilities() Capabilities {
func (r *statsTestReporter) Flush() {}

func TestCounter(t *testing.T) {
counter := newCounter(nil)
counter := newCounter(nil, nil)
r := newStatsTestReporter()

counter.Inc(1)
Expand All @@ -101,7 +101,7 @@ func TestCounter(t *testing.T) {
}

func TestGauge(t *testing.T) {
gauge := newGauge(nil)
gauge := newGauge(nil, nil)
r := newStatsTestReporter()

gauge.Update(42)
Expand Down

0 comments on commit 7653526

Please sign in to comment.