diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index bb59a3324f..2f36a59bd6 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -84,7 +84,7 @@ trait Histogram extends Ordered[Histogram] { // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. if (bucket == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) { - return bucketTop(bucket-1) + return bucketTop(numBuckets-2) } else if (bucket == 0 && bucketTop(0) <= 0) { return bucketTop(0) // zero or negative bucket } else { diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index 5e177b2f90..7515dd3473 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -11,6 +11,14 @@ object HistogramTest { Array[Double](11, 16, 26, 27, 33, 42, 46, 55), Array[Double](4, 4, 5, 33, 35, 67, 91, 121) ) + + val rawHistBuckets2 = Seq( + Array[Double](0, 25, 17, 20, 25, 34, 76, 82), + Array[Double](0, 22, 26, 26, 36, 38, 56, 59), + Array[Double](0, 27, 26, 27, 33, 42, 46, 55), + Array[Double](0, 8, 5, 33, 35, 67, 91, 121) + ) + val rawLongBuckets = rawHistBuckets.map(_.map(_.toLong)) val mutableHistograms = rawHistBuckets.map { buckets => // Create a new copy here, so that mutations don't affect original array @@ -26,7 +34,7 @@ object HistogramTest { } val otelExpBuckets = Base2ExpHistogramBuckets(3, -3, 7) - val otelExpHistograms = rawHistBuckets.map { buckets => + val otelExpHistograms = rawHistBuckets2.map { buckets => LongHistogram(otelExpBuckets, buckets.take(otelExpBuckets.numBuckets).map(_.toLong)) } @@ -245,6 +253,32 @@ class HistogramTest extends NativeVectorTest { h3.quantile(0.01, 1.0, 1617.0) shouldEqual 1.0 +- 0.1 // must use the starting reference from min } + it("should calculate more accurate quantile for otel exponential histogram using max and min column") { + // bucket boundaries for scale 3, range -3 to 3, 7 buckets + // zeroBucket: 0.0 -3: 0.840896, -2: 0.917004, -1: 1.000000, 0; 1.090508, 1: 1.189207, 2: 1.296840, 3: 1.414214 + val exp95thWithoutMinMax = Seq(1.3329136609345256, 1.2987136172035367, 1.3772644080792311, 1.3897175222720994) + otelExpHistograms.map { h => + h.quantile(0.95) + }.zip(exp95thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the max now, unlike before + val exp95thWithMinMax = Seq(1.2978395301558558, 1.296892165727062, 1.2990334920692943, 1.2993620241156902) + otelExpHistograms.map { h => + h.quantile(0.95, 0.78, 1.3) // notice 1.3 max is less than last bucket + }.zip(exp95thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + val exp5thWithoutMinMax = Seq(0.13790701210160922, 0.11275656477265718, 0.08564685710917462, 0.6359279140356217) + otelExpHistograms.map { h => + h.quantile(0.05) + }.zip(exp5thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the min now, unlike before + val exp5thWithMinMax = Seq(0.7896758526873408, 0.7879023377662122, 0.7859951235344664, 0.825628309567315) + otelExpHistograms.map { h => + h.quantile(0.05, 0.78, 1.3) // notice 0.78 min is less than first non-zero bucket, but bigger than previous otel bucket if it existed + }.zip(exp5thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + } + it("should serialize to and from BinaryHistograms and compare correctly") { val binHistograms = mutableHistograms.map { h => val buf = new ExpandableArrayBuffer() diff --git a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala index 0a71367863..d5479e09d9 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala @@ -17,6 +17,7 @@ import filodb.memory._ import filodb.memory.data.ChunkMap import filodb.memory.format.{TupleRowReader, vectors => bv} import filodb.memory.BinaryRegion.NativePointer +import filodb.memory.format.vectors.MutableHistogram import filodb.query.exec._ /** @@ -313,6 +314,39 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { } } + // Goal of this is to verify histogram rate functionality for diff time windows. + // See PeriodicSampleMapperSpec for verifying integration of histograms with min and max + it("should aggregate both max/min and hist for rate when min/max is in schema") { + val (data, rv) = MMD.histMaxMinRV(defaultStartTS, pubFreq, 150, 8) + (0 until numIterations).foreach { x => + val windowSize = rand.nextInt(50) + 10 + val step = rand.nextInt(50) + 5 + info(s"iteration $x windowSize=$windowSize step=$step") + + val row = new TransientHistMaxMinRow() + val chunkedIt = chunkedWindowItHist(data, rv, new RateAndMinMaxOverTimeFuncHD(5, 4), windowSize, step, row) + chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) => + val aggHist = aggRow.getHistogram(1) + val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram]) + .foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg } + aggHist.asInstanceOf[MutableHistogram].values // what we got + .zip(sumRawHist.values.map(_/(windowSize-1)/10)) // expected + .foreach { case (a, b) => + a shouldEqual b +- 0.0000001 + } + + val maxMax = rawDataWindow.map(_(5).asInstanceOf[Double]) + .foldLeft(0.0) { case (agg, m) => Math.max(agg, m) } + aggRow.getDouble(2) shouldEqual maxMax + + val minMin = rawDataWindow.map(_(4).asInstanceOf[Double]) + .foldLeft(Double.MaxValue) { case (agg, m) => Math.min(agg, m) } + aggRow.getDouble(3) shouldEqual minMin + + } + } + } + it("should correctly aggregate min_over_time / max_over_time using both chunked and sliding iterators") { val data = (1 to 240).map(_.toDouble) val chunkSize = 40