Skip to content

Commit

Permalink
fix(chstorage): properly map histogram exemplars
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Dec 11, 2023
1 parent 92578d1 commit bae195f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 28 deletions.
115 changes: 88 additions & 27 deletions internal/chstorage/inserter_metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package chstorage

import (
"cmp"
"context"
"math"
"slices"
"strconv"
"time"

Expand Down Expand Up @@ -168,17 +171,6 @@ func (b *metricsBatch) addHistogramPoints(name string, res pcommon.Map, slice pm

b.addName(name)
b.addLabels(attrs)
if err := b.addExemplars(
exemplarSeries{
Name: name,
Timestamp: ts,
Attributes: attrs,
Resource: res,
},
point.Exemplars(),
); err != nil {
return errors.Wrap(err, "map exemplars")
}
// Save original histogram.
c.name.Append(name)
c.timestamp.Append(ts)
Expand Down Expand Up @@ -235,40 +227,109 @@ func (b *metricsBatch) addHistogramPoints(name string, res pcommon.Map, slice pm
)

var (
cumCount uint64
bucketName = name + "_bucket"
cumCount uint64
bucketName = name + "_bucket"
bucketBounds = make([]histogramBucketBounds, 0, len(bucketCounts))
)
for i := 0; i < min(len(bucketCounts), len(explicitBounds)); i++ {
bound := explicitBounds[i]
cumCount += bucketCounts[i]

key := [2]string{
"le",
strconv.FormatFloat(bound, 'f', -1, 64),
}
bucketBounds = append(bucketBounds, histogramBucketBounds{
bound: bound,
bucketKey: key,
})

// Generate series with "_bucket" suffix and "le" label.
b.addMappedSample(
series,
bucketName,
histogramBucket,
float64(cumCount),
[2]string{
"le",
strconv.FormatFloat(bound, 'f', -1, 64),
},
key,
)
}
// Generate series with "_bucket" suffix and "le" label.
b.addMappedSample(
series,
bucketName,
histogramBucket,
float64(cumCount),
[2]string{
{
key := [2]string{
"le",
"+Inf",
}
bucketBounds = append(bucketBounds, histogramBucketBounds{
bound: math.Inf(1),
bucketKey: key,
})

b.addMappedSample(
series,
bucketName,
histogramBucket,
float64(cumCount),
key,
)
}

if err := b.addHistogramExemplars(
exemplarSeries{
// Note: we're using the "_bucket" name, not the original.
Name: bucketName,
Timestamp: ts,
Attributes: attrs,
Resource: res,
},
)
point.Exemplars(),
bucketBounds,
); err != nil {
return errors.Wrap(err, "map exemplars")
}
}
return nil
}

func (b *metricsBatch) addHistogramExemplars(
p exemplarSeries,
exemplars pmetric.ExemplarSlice,
bounds []histogramBucketBounds,
) error {
slices.SortFunc(bounds, func(a, b histogramBucketBounds) int {
return cmp.Compare(a.bound, b.bound)
})
for i := 0; i < exemplars.Len(); i++ {
e := exemplars.At(i)
for _, bound := range bounds {
var val float64
switch typ := e.ValueType(); typ {
case pmetric.ExemplarValueTypeEmpty:
// Just ignore it.
return nil
case pmetric.ExemplarValueTypeInt:
// TODO(tdakkota): check for overflow
val = float64(e.IntValue())
case pmetric.ExemplarValueTypeDouble:
val = e.DoubleValue()
default:
return errors.Errorf("unexpected exemplar value type: %v", typ)
}

if val <= bound.bound {
if err := b.addExemplar(p, e, bound.bucketKey); err != nil {
return err
}
}
}
}
return nil
}

type histogramBucketBounds struct {
bound float64
bucketKey [2]string
}

func (b *metricsBatch) addExpHistogramPoints(name string, res pcommon.Map, slice pmetric.ExponentialHistogramDataPointSlice) error {
var (
c = b.expHistograms
Expand Down Expand Up @@ -423,14 +484,14 @@ type exemplarSeries struct {

func (b *metricsBatch) addExemplars(p exemplarSeries, exemplars pmetric.ExemplarSlice) error {
for i := 0; i < exemplars.Len(); i++ {
if err := b.addExemplar(p, exemplars.At(i)); err != nil {
if err := b.addExemplar(p, exemplars.At(i), [2]string{}); err != nil {
return err
}
}
return nil
}

func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar) error {
func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar, bucketKey [2]string) error {
c := b.exemplars

var val float64
Expand All @@ -456,7 +517,7 @@ func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar) error {
c.spanID.Append(e.SpanID())
c.traceID.Append(e.TraceID())

c.attributes.Append(encodeAttributes(p.Attributes))
c.attributes.Append(encodeAttributes(p.Attributes, bucketKey))
c.resource.Append(encodeAttributes(p.Resource))
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/chstorage/querier_metrics_exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
"github.com/go-faster/errors"
"github.com/go-faster/oteldb/internal/otelstorage"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/go-faster/oteldb/internal/otelstorage"
)

var _ storage.ExemplarQueryable = (*Querier)(nil)
Expand Down

0 comments on commit bae195f

Please sign in to comment.