Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Nov 6, 2024
1 parent 464aade commit 3a3f01e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object NodeClusterActor {
* @param source the IngestionSource on each node. Use noOpSource to not start ingestion and
* manually push records into NodeCoordinator.
* @param storeConfig a StoreConfig for the MemStore.
* @param overrideSchema if true, will override the schema in the MetaStore with the schema in the Dataset.
* @return DatasetVerified - meaning the dataset and columns are valid. Does not mean ingestion is
* setup on all nodes - for that, subscribe to ShardMapUpdate's
*/
Expand Down
40 changes: 30 additions & 10 deletions memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,19 @@ final case class MutableHistogram(var buckets: HistogramBuckets,
// if our buckets is subset of other buckets, then we can add the values
if (ourBuckets.canAccommodate(otherBuckets)) {
ourBuckets.addValues(values, otherBuckets, other)
true
// since we are making the exp histogram buckets cumulative during
// ingestion, we can assume cumulative bucket values
false
} else {
val newBuckets = ourBuckets.add(otherBuckets) // create new buckets that can accommodate both
val newValues = new Array[Double](newBuckets.numBuckets) // new values array
newBuckets.addValues(newValues, ourBuckets, this)
newBuckets.addValues(newValues, otherBuckets, other)
buckets = newBuckets
values = newValues
true
// since we are making the exp histogram buckets cumulative during
// ingestion, we can assume cumulative bucket values
false
}
}
else {
Expand Down Expand Up @@ -521,6 +525,8 @@ final case class GeometricBuckets(firstBucket: Double,
object Base2ExpHistogramBuckets {
// TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here
val maxBuckets = 200
val maxAbsScale = 100
val maxAbsBucketIndex = 500
}

/**
Expand Down Expand Up @@ -548,10 +554,19 @@ final case class Base2ExpHistogramBuckets(scale: Int,
startIndexNegativeBuckets: Int = 0,
numNegativeBuckets: Int = 0
) extends HistogramBuckets {
require(numPositiveBuckets <= Base2ExpHistogramBuckets.maxBuckets && numPositiveBuckets >= 0,
s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${Base2ExpHistogramBuckets.maxBuckets}")
import Base2ExpHistogramBuckets._
require(numPositiveBuckets <= maxBuckets && numPositiveBuckets >= 0,
s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${maxBuckets}")
require(numNegativeBuckets == 0 && startIndexNegativeBuckets == 0, "Negative buckets not supported yet")
require(startIndexPositiveBuckets > -500 && startIndexPositiveBuckets < 500)
// limit bucket index and scale values since the corresponding bucketTop values can get very large or very small and
// may tend to very very large or small values above double precision. Note we are also serializing
// these as shorts in the binary format, so they need to be within limits of short
require(startIndexPositiveBuckets > -maxAbsBucketIndex && startIndexPositiveBuckets < maxAbsBucketIndex,
s"Invalid startIndexPositiveBuckets $startIndexPositiveBuckets should be between " +
s"${-maxAbsBucketIndex} and ${maxAbsBucketIndex}")
require(scale > -maxAbsScale && scale < maxAbsScale,
s"Invalid scale $scale should be between ${-maxAbsScale} and ${maxAbsScale}")

val base: Double = Math.pow(2, Math.pow(2, -scale))
val startBucketTop: Double = bucketTop(1)
val endBucketTop: Double = bucketTop(numBuckets - 1)
Expand All @@ -572,8 +587,11 @@ final case class Base2ExpHistogramBuckets(scale: Int,
}

final def serialize(buf: MutableDirectBuffer, pos: Int): Int = {
require(scale < 100 && scale > -100, s"Unsupported scale $scale")
require(numBuckets < 65536, s"Too many buckets: $numBuckets")
require(numBuckets < Short.MaxValue, s"numBucket overflow: $numBuckets")
require(startIndexPositiveBuckets < Short.MaxValue,
s"startIndexPositiveBuckets overflow: $startIndexPositiveBuckets")
require(scale < Short.MaxValue, s"scale overflow: $scale")
require(numPositiveBuckets < Short.MaxValue, s"numPositiveBuckets overflow $numPositiveBuckets")
// per BinHistogram format, bucket def len comes first always
buf.putShort(pos, (2 + 2 + 4 + 4).toShort)
// numBuckets comes next always
Expand Down Expand Up @@ -612,10 +630,11 @@ final case class Base2ExpHistogramBuckets(scale: Int,
if (canAccommodate(o)) {
this
} else {
// calculate new bucket scheme that can accommodate both bucket ranges
val minBucketTopNeeded = Math.min(startBucketTop, o.startBucketTop)
val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop)
var newScale = Math.min(scale, o.scale)
var newBase = Math.pow(2, Math.pow(2, -newScale))
var newBase = Math.max(base, o.base)
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1
while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) {
Expand All @@ -642,13 +661,14 @@ final case class Base2ExpHistogramBuckets(scale: Int,
// for each ourValues, find the bucket index in otherValues and add the value
val fac = Math.pow(2 , otherBuckets.scale - scale).toInt
ourValues(0) += otherHistogram.bucketValue(0) // add the zero bucket
cforRange { 0 until ourValues.length - 1 } { i =>
val ourBucketIndexPlus1 = startIndexPositiveBuckets + i + 1
cforRange { startIndexPositiveBuckets until startIndexPositiveBuckets + numPositiveBuckets } { ourBucketIndex =>
val ourBucketIndexPlus1 = ourBucketIndex + 1
val otherBucketIndexPlus1 = ourBucketIndexPlus1 * fac
val ourArrayIndex = bucketIndexToArrayIndex(ourBucketIndexPlus1 - 1)
val otherArrayIndex = otherBuckets.bucketIndexToArrayIndex(otherBucketIndexPlus1 - 1)
if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) {
// TODO remove this require once code is stable
require(ourArrayIndex != 0, "double counting zero bucket")
require( Math.abs(bucketTop(ourArrayIndex) - otherBuckets.bucketTop(otherArrayIndex)) <= 0.000001,
s"Bucket tops of $ourArrayIndex and $otherArrayIndex don't match:" +
s" ${bucketTop(ourArrayIndex)} != ${otherBuckets.bucketTop(otherArrayIndex)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import filodb.memory.format.MemoryReader._
* 0x03 geometric + NibblePacked delta Long values
* 0x04 geometric_1 + NibblePacked delta Long values (see [[HistogramBuckets]])
* 0x05 custom LE/bucket values + NibblePacked delta Long values
* 0x09 geometric_2 + NibblePacked delta Long values (see [[Base2ExpHistogramBuckets]])
* 0x09 otelExp_Delta + NibblePacked delta Long values (see [[Base2ExpHistogramBuckets]])
*
* +0003 u16 2-byte length of Histogram bucket definition
* +0005 [u8] Histogram bucket definition, see [[HistogramBuckets]]
Expand Down Expand Up @@ -114,7 +114,7 @@ object BinaryHistogram extends StrictLogging {
def isValidFormatCode(code: Byte): Boolean = {
(code == HistFormat_Null) || (code == HistFormat_Geometric1_Delta) || (code == HistFormat_Geometric_Delta) ||
(code == HistFormat_Custom_Delta) || (code == HistFormat_OtelExp_Delta)
// Question: why are other formats not here as valid ?
// Question: why are other formats like HistFormat_Geometric_XOR not here as valid ?
}

/**
Expand Down Expand Up @@ -150,7 +150,7 @@ object BinaryHistogram extends StrictLogging {
* @return the number of bytes written, including the length prefix
*/
def writeDelta(buckets: HistogramBuckets, values: Array[Long], buf: MutableDirectBuffer): Int = {
require(buckets.numBuckets == values.size, s"Values array size of ${values.size} != ${buckets.numBuckets}")
require(buckets.numBuckets == values.length, s"Values array size of ${values.length} != ${buckets.numBuckets}")
val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match {
case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta
case g: GeometricBuckets => HistFormat_Geometric_Delta
Expand Down

0 comments on commit 3a3f01e

Please sign in to comment.