From 528ee55cdaf1945578693fdcc036a3162c3ea9ff Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 12 Nov 2024 08:36:05 -0800 Subject: [PATCH] More docs and small cleanup --- .../format/vectors/Histogram.scala | 90 ++++++++++++++----- 1 file changed, 66 insertions(+), 24 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 73b453db94..1846d4fa47 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -570,9 +570,10 @@ object HistogramBuckets { val scale = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails) val startPosBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 2) val numPosBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 4) - val startNegBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 6) - val numNegBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 8) - Base2ExpHistogramBuckets(scale, startPosBucket, numPosBuckets, startNegBucket, numNegBuckets) + // uncomment when negative buckets are supported + // val startNegBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 6) + // val numNegBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 8) + Base2ExpHistogramBuckets(scale, startPosBucket, numPosBuckets) } /** @@ -642,30 +643,26 @@ object Base2ExpHistogramBuckets { * See https://github.com/open-telemetry/opentelemetry-proto/blob/7312bdf63218acf27fe96430b7231de37fd091f2/ * opentelemetry/proto/metrics/v1/metrics.proto#L500 * for the definition. + * + * Easier explanation here: https://dyladan.me/histograms/2023/05/04/exponential-histograms/ + * * The bucket scheme is based on the formula: * `bucket(index) = base ^ (index + 1)` * where index is the bucket index, and `base = 2 ^ 2 ^ -scale` * - * Negative buckets are not supported yet, so startIndexNegativeBuckets and numNegativeBuckets are always 0. - * They are here since the proto has it (but the client SDKs do not), and we want to be able to support it - * when the spec changes + * Negative observations are not supported yet. But serialization format is forward compatible. * - * @param scale - * @param startIndexPositiveBuckets - * @param numPositiveBuckets - * @param startIndexNegativeBuckets negative buckets are not supported yet, so this is always 0 - * @param numNegativeBuckets negative buckets are not supported yet, so this is always 0 + * @param scale OTel metrics ExponentialHistogramDataPoint proto scale value + * @param startIndexPositiveBuckets offset for positive buckets from the same proto + * @param numPositiveBuckets length of the positive buckets array from the same proto */ final case class Base2ExpHistogramBuckets(scale: Int, startIndexPositiveBuckets: Int, - numPositiveBuckets: Int, - startIndexNegativeBuckets: Int = 0, - numNegativeBuckets: Int = 0 + numPositiveBuckets: Int ) extends HistogramBuckets { import Base2ExpHistogramBuckets._ require(numPositiveBuckets <= maxBuckets && numPositiveBuckets >= 0, s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${maxBuckets}") - require(numNegativeBuckets == 0 && startIndexNegativeBuckets == 0, "Negative buckets not supported yet") // limit bucket index and scale values since the corresponding bucketTop values can get very large or very small and // may tend to very very large or small values above double precision. Note we are also serializing // these as shorts in the binary format, so they need to be within limits of short @@ -681,8 +678,6 @@ final case class Base2ExpHistogramBuckets(scale: Int, override def numBuckets: Int = numPositiveBuckets + 1 // add one for zero count - def bucketIndexToArrayIndex(index: Int): Int = index - startIndexPositiveBuckets + 1 - final def bucketTop(no: Int): Double = { if (no ==0) 0.0 else { // From OTel metrics proto docs: @@ -695,6 +690,12 @@ final case class Base2ExpHistogramBuckets(scale: Int, } final def serialize(buf: MutableDirectBuffer, pos: Int): Int = { + + // Negative observations are not supported yet, so startIndexNegativeBuckets and numNegativeBuckets are always 0. + // They are here since the proto has it (but the client SDKs do not), and we want to be able to support it + // when the spec changes + val startIndexNegativeBuckets = 0 + val numNegativeBuckets = 0 require(numBuckets < Short.MaxValue, s"numBucket overflow: $numBuckets") require(startIndexPositiveBuckets < Short.MaxValue, s"startIndexPositiveBuckets overflow: $startIndexPositiveBuckets") @@ -722,9 +723,9 @@ final case class Base2ExpHistogramBuckets(scale: Int, override def similarForMath(other: HistogramBuckets): Boolean = { other match { case c: Base2ExpHistogramBuckets => this.scale == c.scale && - this.startIndexPositiveBuckets == c.startIndexPositiveBuckets && - this.numPositiveBuckets == c.numPositiveBuckets - case _ => false + this.startIndexPositiveBuckets == c.startIndexPositiveBuckets && + this.numPositiveBuckets == c.numPositiveBuckets + case _ => false } } @@ -745,6 +746,8 @@ final case class Base2ExpHistogramBuckets(scale: Int, var newBase = Math.max(base, o.base) var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 + // Even if the two schemes are of same scale, they can have non-overlapping bucket ranges. + // The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits. while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) { newScale -= 1 newBase = Math.pow(2, Math.pow(2, -newScale)) @@ -755,6 +758,14 @@ final case class Base2ExpHistogramBuckets(scale: Int, } } + /** + * Converts an OTel exponential index to array index. + * For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then + * -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10. + * Know that 0 array index is reserved for zero bucket. + */ + def bucketIndexToArrayIndex(index: Int): Int = index - startIndexPositiveBuckets + 1 + /** * Add the otherValues using otherBuckets scheme to ourValues. This method assumes (a) ourValues uses "this" * bucket scheme and (a) this bucket scheme can accommodate otherBuckets. @@ -762,28 +773,59 @@ final case class Base2ExpHistogramBuckets(scale: Int, def addValues(ourValues: Array[Double], otherBuckets: Base2ExpHistogramBuckets, otherHistogram: HistogramWithBuckets): Unit = { - // TODO remove the require once code is stable + // TODO consider removing the require once code is stable require(ourValues.length == numBuckets) require(otherHistogram.numBuckets == otherBuckets.numBuckets) require(canAccommodate(otherBuckets)) + val scaleIncrease = otherBuckets.scale - scale // for each ourValues, find the bucket index in otherValues and add the value - val fac = Math.pow(2 , otherBuckets.scale - scale).toInt + val fac = Math.pow(2 , scaleIncrease).toInt ourValues(0) += otherHistogram.bucketValue(0) // add the zero bucket + // For this algorithm we take advantage of the pattern that there is a relationship between scale + // and the bucketIndexPlus1. When scale increases by scaleIncrease, + // then the otherBucketIndexPlus1 for the same bucket in the other scheme + // is 2^scaleIncrease (which is fac) times otherBucketIndexPlus1. cforRange { startIndexPositiveBuckets until startIndexPositiveBuckets + numPositiveBuckets } { ourBucketIndex => val ourBucketIndexPlus1 = ourBucketIndex + 1 val otherBucketIndexPlus1 = ourBucketIndexPlus1 * fac val ourArrayIndex = bucketIndexToArrayIndex(ourBucketIndexPlus1 - 1) + // ourArrayIndex is guaranteed to be within bounds since we are iterating our bucket range + val otherArrayIndex = otherBuckets.bucketIndexToArrayIndex(otherBucketIndexPlus1 - 1) - if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) { - // TODO remove this require once code is stable + // there is a possibility that otherArrayIndex is out of bounds, so we need to check + + if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) { // add if within bounds + + // Example 1: add every 2nd bucket from other to our bucket + // other scale = 2 . . . . . . . . . . . + // ↓ ↓ ↓ ↓ ↓ ↓ + // our scale = 1 . . . . . . + // + // Example 2: add every 4th bucket from other to our bucket + // other scale = 3 . . . . . . . . . . . . . + // ↓ ↓ ↓ ↓ + // our scale = 1 . . . . + + // TODO consider removing the require once code is stable require(ourArrayIndex != 0, "double counting zero bucket") require( Math.abs(bucketTop(ourArrayIndex) - otherBuckets.bucketTop(otherArrayIndex)) <= 0.000001, s"Bucket tops of $ourArrayIndex and $otherArrayIndex don't match:" + s" ${bucketTop(ourArrayIndex)} != ${otherBuckets.bucketTop(otherArrayIndex)}") ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherArrayIndex) } else if (otherArrayIndex >= otherBuckets.numBuckets) { + // if otherArrayIndex higher than upper bound, add to our last bucket since bucket counts are cumulative + + // Example 3: add every 4th bucket from other to our + // other scale = 4 . . . . . . . . . . . . X add last bucket X here + // ↓ ↓ ↓ ↓ ↓ ↓ + // our scale = 2 . . . . . . + + // TODO consider removing the require once code is stable + require(ourArrayIndex != 0, "double counting zero bucket") ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherBuckets.numBuckets - 1) } + // if otherArrayIndex is less than lower bound, the counts are already included and we don't need + // to add since the counts are cumulative } } }