Skip to content

Commit

Permalink
More docs and small cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Nov 12, 2024
1 parent ff390d2 commit 5dce636
Showing 1 changed file with 66 additions and 24 deletions.
90 changes: 66 additions & 24 deletions memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,10 @@ object HistogramBuckets {
val scale = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails)
val startPosBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 2)
val numPosBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 4)
val startNegBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 6)
val numNegBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 8)
Base2ExpHistogramBuckets(scale, startPosBucket, numPosBuckets, startNegBucket, numNegBuckets)
// uncomment when negative buckets are supported
// val startNegBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 6)
// val numNegBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 8)
Base2ExpHistogramBuckets(scale, startPosBucket, numPosBuckets)
}

/**
Expand Down Expand Up @@ -636,30 +637,26 @@ object Base2ExpHistogramBuckets {
* See https://github.com/open-telemetry/opentelemetry-proto/blob/7312bdf63218acf27fe96430b7231de37fd091f2/
* opentelemetry/proto/metrics/v1/metrics.proto#L500
* for the definition.
*
* Easier explanation here: https://dyladan.me/histograms/2023/05/04/exponential-histograms/
*
* The bucket scheme is based on the formula:
* `bucket(index) = base ^ (index + 1)`
* where index is the bucket index, and `base = 2 ^ 2 ^ -scale`
*
* Negative buckets are not supported yet, so startIndexNegativeBuckets and numNegativeBuckets are always 0.
* They are here since the proto has it (but the client SDKs do not), and we want to be able to support it
* when the spec changes
* Negative observations are not supported yet. But serialization format is forward compatible.
*
* @param scale
* @param startIndexPositiveBuckets
* @param numPositiveBuckets
* @param startIndexNegativeBuckets negative buckets are not supported yet, so this is always 0
* @param numNegativeBuckets negative buckets are not supported yet, so this is always 0
* @param scale OTel metrics ExponentialHistogramDataPoint proto scale value
* @param startIndexPositiveBuckets offset for positive buckets from the same proto
* @param numPositiveBuckets length of the positive buckets array from the same proto
*/
final case class Base2ExpHistogramBuckets(scale: Int,
startIndexPositiveBuckets: Int,
numPositiveBuckets: Int,
startIndexNegativeBuckets: Int = 0,
numNegativeBuckets: Int = 0
numPositiveBuckets: Int
) extends HistogramBuckets {
import Base2ExpHistogramBuckets._
require(numPositiveBuckets <= maxBuckets && numPositiveBuckets >= 0,
s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${maxBuckets}")
require(numNegativeBuckets == 0 && startIndexNegativeBuckets == 0, "Negative buckets not supported yet")
// limit bucket index and scale values since the corresponding bucketTop values can get very large or very small and
// may tend to very very large or small values above double precision. Note we are also serializing
// these as shorts in the binary format, so they need to be within limits of short
Expand All @@ -675,8 +672,6 @@ final case class Base2ExpHistogramBuckets(scale: Int,

override def numBuckets: Int = numPositiveBuckets + 1 // add one for zero count

def bucketIndexToArrayIndex(index: Int): Int = index - startIndexPositiveBuckets + 1

final def bucketTop(no: Int): Double = {
if (no ==0) 0.0 else {
// From OTel metrics proto docs:
Expand All @@ -689,6 +684,12 @@ final case class Base2ExpHistogramBuckets(scale: Int,
}

final def serialize(buf: MutableDirectBuffer, pos: Int): Int = {

// Negative observations are not supported yet, so startIndexNegativeBuckets and numNegativeBuckets are always 0.
// They are here since the proto has it (but the client SDKs do not), and we want to be able to support it
// when the spec changes
val startIndexNegativeBuckets = 0
val numNegativeBuckets = 0
require(numBuckets < Short.MaxValue, s"numBucket overflow: $numBuckets")
require(startIndexPositiveBuckets < Short.MaxValue,
s"startIndexPositiveBuckets overflow: $startIndexPositiveBuckets")
Expand Down Expand Up @@ -716,9 +717,9 @@ final case class Base2ExpHistogramBuckets(scale: Int,
override def similarForMath(other: HistogramBuckets): Boolean = {
other match {
case c: Base2ExpHistogramBuckets => this.scale == c.scale &&
this.startIndexPositiveBuckets == c.startIndexPositiveBuckets &&
this.numPositiveBuckets == c.numPositiveBuckets
case _ => false
this.startIndexPositiveBuckets == c.startIndexPositiveBuckets &&
this.numPositiveBuckets == c.numPositiveBuckets
case _ => false
}
}

Expand All @@ -739,6 +740,8 @@ final case class Base2ExpHistogramBuckets(scale: Int,
var newBase = Math.max(base, o.base)
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1
// Even if the two schemes are of same scale, they can have non-overlapping bucket ranges.
// The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits.
while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) {
newScale -= 1
newBase = Math.pow(2, Math.pow(2, -newScale))
Expand All @@ -749,35 +752,74 @@ final case class Base2ExpHistogramBuckets(scale: Int,
}
}

/**
* Converts an OTel exponential index to array index.
* For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then
* -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10.
* Know that 0 array index is reserved for zero bucket.
*/
def bucketIndexToArrayIndex(index: Int): Int = index - startIndexPositiveBuckets + 1

/**
* Add the otherValues using otherBuckets scheme to ourValues. This method assumes (a) ourValues uses "this"
* bucket scheme and (a) this bucket scheme can accommodate otherBuckets.
*/
def addValues(ourValues: Array[Double],
otherBuckets: Base2ExpHistogramBuckets,
otherHistogram: HistogramWithBuckets): Unit = {
// TODO remove the require once code is stable
// TODO consider removing the require once code is stable
require(ourValues.length == numBuckets)
require(otherHistogram.numBuckets == otherBuckets.numBuckets)
require(canAccommodate(otherBuckets))
val scaleIncrease = otherBuckets.scale - scale
// for each ourValues, find the bucket index in otherValues and add the value
val fac = Math.pow(2 , otherBuckets.scale - scale).toInt
val fac = Math.pow(2 , scaleIncrease).toInt
ourValues(0) += otherHistogram.bucketValue(0) // add the zero bucket
// For this algorithm we take advantage of the pattern that there is a relationship between scale
// and the bucketIndexPlus1. When scale increases by scaleIncrease,
// then the otherBucketIndexPlus1 for the same bucket in the other scheme
// is 2^scaleIncrease (which is fac) times otherBucketIndexPlus1.
cforRange { startIndexPositiveBuckets until startIndexPositiveBuckets + numPositiveBuckets } { ourBucketIndex =>
val ourBucketIndexPlus1 = ourBucketIndex + 1
val otherBucketIndexPlus1 = ourBucketIndexPlus1 * fac
val ourArrayIndex = bucketIndexToArrayIndex(ourBucketIndexPlus1 - 1)
// ourArrayIndex is guaranteed to be within bounds since we are iterating our bucket range

val otherArrayIndex = otherBuckets.bucketIndexToArrayIndex(otherBucketIndexPlus1 - 1)
if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) {
// TODO remove this require once code is stable
// there is a possibility that otherArrayIndex is out of bounds, so we need to check

if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) { // add if within bounds

// Example 1: add every 2nd bucket from other to our bucket
// other scale = 2 . . . . . . . . . . .
// ↓ ↓ ↓ ↓ ↓ ↓
// our scale = 1 . . . . . .
//
// Example 2: add every 4th bucket from other to our bucket
// other scale = 3 . . . . . . . . . . . . .
// ↓ ↓ ↓ ↓
// our scale = 1 . . . .

// TODO consider removing the require once code is stable
require(ourArrayIndex != 0, "double counting zero bucket")
require( Math.abs(bucketTop(ourArrayIndex) - otherBuckets.bucketTop(otherArrayIndex)) <= 0.000001,
s"Bucket tops of $ourArrayIndex and $otherArrayIndex don't match:" +
s" ${bucketTop(ourArrayIndex)} != ${otherBuckets.bucketTop(otherArrayIndex)}")
ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherArrayIndex)
} else if (otherArrayIndex >= otherBuckets.numBuckets) {
// if otherArrayIndex higher than upper bound, add to our last bucket since bucket counts are cumulative

// Example 3: add every 4th bucket from other to our
// other scale = 4 . . . . . . . . . . . . X add last bucket X here
// ↓ ↓ ↓ ↓ ↓ ↓
// our scale = 2 . . . . . .

// TODO consider removing the require once code is stable
require(ourArrayIndex != 0, "double counting zero bucket")
ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherBuckets.numBuckets - 1)
}
// if otherArrayIndex is less than lower bound, the counts are already included and we don't need
// to add since the counts are cumulative
}
}
}
Expand Down

0 comments on commit 5dce636

Please sign in to comment.