From bae195ffd9a2ed3cd3c45f64f2788a1c0f4f366a Mon Sep 17 00:00:00 2001 From: tdakkota Date: Mon, 11 Dec 2023 12:29:42 +0300 Subject: [PATCH] fix(chstorage): properly map histogram exemplars --- internal/chstorage/inserter_metrics.go | 115 ++++++++++++++---- .../chstorage/querier_metrics_exemplars.go | 3 +- 2 files changed, 90 insertions(+), 28 deletions(-) diff --git a/internal/chstorage/inserter_metrics.go b/internal/chstorage/inserter_metrics.go index fcf7be8b..846439ab 100644 --- a/internal/chstorage/inserter_metrics.go +++ b/internal/chstorage/inserter_metrics.go @@ -1,7 +1,10 @@ package chstorage import ( + "cmp" "context" + "math" + "slices" "strconv" "time" @@ -168,17 +171,6 @@ func (b *metricsBatch) addHistogramPoints(name string, res pcommon.Map, slice pm b.addName(name) b.addLabels(attrs) - if err := b.addExemplars( - exemplarSeries{ - Name: name, - Timestamp: ts, - Attributes: attrs, - Resource: res, - }, - point.Exemplars(), - ); err != nil { - return errors.Wrap(err, "map exemplars") - } // Save original histogram. c.name.Append(name) c.timestamp.Append(ts) @@ -235,40 +227,109 @@ func (b *metricsBatch) addHistogramPoints(name string, res pcommon.Map, slice pm ) var ( - cumCount uint64 - bucketName = name + "_bucket" + cumCount uint64 + bucketName = name + "_bucket" + bucketBounds = make([]histogramBucketBounds, 0, len(bucketCounts)) ) for i := 0; i < min(len(bucketCounts), len(explicitBounds)); i++ { bound := explicitBounds[i] cumCount += bucketCounts[i] + key := [2]string{ + "le", + strconv.FormatFloat(bound, 'f', -1, 64), + } + bucketBounds = append(bucketBounds, histogramBucketBounds{ + bound: bound, + bucketKey: key, + }) + // Generate series with "_bucket" suffix and "le" label. b.addMappedSample( series, bucketName, histogramBucket, float64(cumCount), - [2]string{ - "le", - strconv.FormatFloat(bound, 'f', -1, 64), - }, + key, ) } // Generate series with "_bucket" suffix and "le" label. - b.addMappedSample( - series, - bucketName, - histogramBucket, - float64(cumCount), - [2]string{ + { + key := [2]string{ "le", "+Inf", + } + bucketBounds = append(bucketBounds, histogramBucketBounds{ + bound: math.Inf(1), + bucketKey: key, + }) + + b.addMappedSample( + series, + bucketName, + histogramBucket, + float64(cumCount), + key, + ) + } + + if err := b.addHistogramExemplars( + exemplarSeries{ + // Note: we're using the "_bucket" name, not the original. + Name: bucketName, + Timestamp: ts, + Attributes: attrs, + Resource: res, }, - ) + point.Exemplars(), + bucketBounds, + ); err != nil { + return errors.Wrap(err, "map exemplars") + } } return nil } +func (b *metricsBatch) addHistogramExemplars( + p exemplarSeries, + exemplars pmetric.ExemplarSlice, + bounds []histogramBucketBounds, +) error { + slices.SortFunc(bounds, func(a, b histogramBucketBounds) int { + return cmp.Compare(a.bound, b.bound) + }) + for i := 0; i < exemplars.Len(); i++ { + e := exemplars.At(i) + for _, bound := range bounds { + var val float64 + switch typ := e.ValueType(); typ { + case pmetric.ExemplarValueTypeEmpty: + // Just ignore it. + return nil + case pmetric.ExemplarValueTypeInt: + // TODO(tdakkota): check for overflow + val = float64(e.IntValue()) + case pmetric.ExemplarValueTypeDouble: + val = e.DoubleValue() + default: + return errors.Errorf("unexpected exemplar value type: %v", typ) + } + + if val <= bound.bound { + if err := b.addExemplar(p, e, bound.bucketKey); err != nil { + return err + } + } + } + } + return nil +} + +type histogramBucketBounds struct { + bound float64 + bucketKey [2]string +} + func (b *metricsBatch) addExpHistogramPoints(name string, res pcommon.Map, slice pmetric.ExponentialHistogramDataPointSlice) error { var ( c = b.expHistograms @@ -423,14 +484,14 @@ type exemplarSeries struct { func (b *metricsBatch) addExemplars(p exemplarSeries, exemplars pmetric.ExemplarSlice) error { for i := 0; i < exemplars.Len(); i++ { - if err := b.addExemplar(p, exemplars.At(i)); err != nil { + if err := b.addExemplar(p, exemplars.At(i), [2]string{}); err != nil { return err } } return nil } -func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar) error { +func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar, bucketKey [2]string) error { c := b.exemplars var val float64 @@ -456,7 +517,7 @@ func (b *metricsBatch) addExemplar(p exemplarSeries, e pmetric.Exemplar) error { c.spanID.Append(e.SpanID()) c.traceID.Append(e.TraceID()) - c.attributes.Append(encodeAttributes(p.Attributes)) + c.attributes.Append(encodeAttributes(p.Attributes, bucketKey)) c.resource.Append(encodeAttributes(p.Resource)) return nil } diff --git a/internal/chstorage/querier_metrics_exemplars.go b/internal/chstorage/querier_metrics_exemplars.go index 9b7a78ca..85e423be 100644 --- a/internal/chstorage/querier_metrics_exemplars.go +++ b/internal/chstorage/querier_metrics_exemplars.go @@ -10,12 +10,13 @@ import ( "github.com/ClickHouse/ch-go" "github.com/ClickHouse/ch-go/proto" "github.com/go-faster/errors" - "github.com/go-faster/oteldb/internal/otelstorage" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + "github.com/go-faster/oteldb/internal/otelstorage" ) var _ storage.ExemplarQueryable = (*Querier)(nil)