From edd3288671af2db1917102c758246040c56cd4e5 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Wed, 13 Nov 2024 14:49:51 -0800 Subject: [PATCH 1/8] histogram_fraction for min-max --- .../format/vectors/Histogram.scala | 55 ++----------------- .../format/vectors/HistogramTest.scala | 30 +++++----- .../query/exec/rangefn/InstantFunction.scala | 30 ++++++---- 3 files changed, 37 insertions(+), 78 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 12c74d2e72..6331c7bf21 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -62,7 +62,9 @@ trait Histogram extends Ordered[Histogram] { /** * Calculates histogram quantile based on bucket values using Prometheus scheme (increasing/LE) */ - def quantile(q: Double): Double = { + def quantile(q: Double, + min: Double = Double.NegativeInfinity, + max: Double = Double.PositiveInfinity): Double = { val result = if (q < 0) Double.NegativeInfinity else if (q > 1) Double.PositiveInfinity else if (numBuckets < 2) Double.NaN @@ -78,7 +80,7 @@ trait Histogram extends Ordered[Histogram] { else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0) else { // interpolate quantile within le bucket - var (bucketStart, bucketEnd, count) = (0d, bucketTop(b), bucketValue(b)) + var (bucketStart, bucketEnd, count) = (Math.max(0d, min), Math.min(bucketTop(b), max), bucketValue(b)) if (b > 0) { bucketStart = bucketTop(b-1) count -= bucketValue(b-1) @@ -264,7 +266,6 @@ final case class LongHistogram(var buckets: HistogramBuckets, var values: Array[ } // TODO if otel histogram, the need to add values in a different way // see if we can refactor since MutableHistogram also has this logic - assert(other.buckets == buckets) cforRange { 0 until numBuckets } { b => values(b) += other.values(b) } @@ -433,54 +434,6 @@ object MutableHistogram { } } -/** - * MaxMinHistogram improves quantile calculation accuracy with a known max value recorded from the client. - * Whereas normally Prom histograms have +Inf as the highest bucket, and we cannot interpolate above the last - * non-Inf bucket, having a max allows us to interpolate from the rank up to the max. - * When the max value is lower, we interpolate between the bottom of the bucket and the max value. - * Both changes mean that the 0.90+ quantiles return much closer to the max value, instead of interpolating or clipping. - * The quantile result can never be above max, regardless of the bucket scheme. - * - * UPDATE: Adding support for min value as well to make sure the quantile is never below the minimum specified value. - * By default, the minimum value is set to 0.0 - */ -final case class MaxMinHistogram(innerHist: HistogramWithBuckets, max: Double, min: Double = 0.0) - extends HistogramWithBuckets { - final def buckets: HistogramBuckets = innerHist.buckets - final def bucketValue(no: Int): Double = innerHist.bucketValue(no) - - def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ??? - - override def quantile(q: Double): Double = { - val result = if (q < 0) Double.NegativeInfinity - else if (q > 1) Double.PositiveInfinity - else if (numBuckets < 2) Double.NaN - else { - // find rank for the quantile using total number of occurrences (which is the last bucket value) - var rank = q * topBucketValue - // using rank, find the le bucket which would have the identified rank - val bucketNum = firstBucketGTE(rank) - - // now calculate quantile. No need to special case top bucket since we will always cap top at max - if (bucketNum == 0 && bucketTop(0) <= 0) return bucketTop(0) - else { - // interpolate quantile within le bucket - var (bucketStart, bucketEnd, count) = (Math.max(0d, min), - Math.min(bucketTop(bucketNum), max), bucketValue(bucketNum)) - if (bucketNum > 0) { - bucketStart = bucketTop(bucketNum-1) - count -= bucketValue(bucketNum-1) - rank -= bucketValue(bucketNum-1) - } - bucketStart + (bucketEnd-bucketStart)*(rank/count) - } - } - result - } - - -} - /** * A scheme for buckets in a histogram. Since these are Prometheus-style histograms, * each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index d781e22b02..8b8b699724 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -169,34 +169,34 @@ class HistogramTest extends NativeVectorTest { } it("should calculate more accurate quantile with MaxMinHistogram using max column") { - val h = MaxMinHistogram(mutableHistograms(0), 90) - h.quantile(0.95) shouldEqual 72.2 +- 0.1 // more accurate due to max! + val h = mutableHistograms(0) + h.quantile(0.95, 0 ,90) shouldEqual 72.2 +- 0.1 // more accurate due to max! // Not just last bucket, but should always be clipped at max regardless of bucket scheme val values = Array[Double](10, 15, 17, 20, 25, 25, 25, 25) - val h2 = MaxMinHistogram(MutableHistogram(bucketScheme, values), 10) - h2.quantile(0.95) shouldEqual 9.5 +- 0.1 // more accurate due to max! + val h2 = MutableHistogram(bucketScheme, values) + h2.quantile(0.95, 0, 10) shouldEqual 9.5 +- 0.1 // more accurate due to max! val values3 = Array[Double](1, 1, 1, 1, 1, 4, 7, 7, 9, 9) ++ Array.fill(54)(12.0) - val h3 = MaxMinHistogram(MutableHistogram(HistogramBuckets.binaryBuckets64, values3), 1617.0) - h3.quantile(0.99) shouldEqual 1593.2 +- 0.1 - h3.quantile(0.90) shouldEqual 1379.4 +- 0.1 + val h3 = MutableHistogram(HistogramBuckets.binaryBuckets64, values3) + h3.quantile(0.99, 0, 1617.0) shouldEqual 1593.2 +- 0.1 + h3.quantile(0.90, 0, 1617.0) shouldEqual 1379.4 +- 0.1 } it("should calculate more accurate quantile with MaxMinHistogram using max and min column") { - val h = MaxMinHistogram(mutableHistograms(0), 100, 10) - h.quantile(0.95) shouldEqual 75.39 +- 0.1 // more accurate due to max, min! + val h = mutableHistograms(0) + h.quantile(0.95, 10, 100) shouldEqual 75.39 +- 0.1 // more accurate due to max, min! // Not just last bucket, but should always be clipped at max regardless of bucket scheme val values = Array[Double](10, 15, 17, 20, 25, 25, 25, 25) - val h2 = MaxMinHistogram(MutableHistogram(bucketScheme, values), 10, 2) - h2.quantile(0.95) shouldEqual 9.5 +- 0.1 // more accurate due to max, min! + val h2 = MutableHistogram(bucketScheme, values) + h2.quantile(0.95, 2, 10) shouldEqual 9.5 +- 0.1 // more accurate due to max, min! val values3 = Array[Double](1, 1, 1, 1, 1, 4, 7, 7, 9, 9) ++ Array.fill(54)(12.0) - val h3 = MaxMinHistogram(MutableHistogram(HistogramBuckets.binaryBuckets64, values3), 1617.0, 1.0) - h3.quantile(0.99) shouldEqual 1593.2 +- 0.1 - h3.quantile(0.90) shouldEqual 1379.4 +- 0.1 - h3.quantile(0.01) shouldEqual 1.0 +- 0.1 // must use the starting reference from min + val h3 = MutableHistogram(HistogramBuckets.binaryBuckets64, values3) + h3.quantile(0.99, 1.0, 1617.0) shouldEqual 1593.2 +- 0.1 + h3.quantile(0.90, 1.0, 1617.0) shouldEqual 1379.4 +- 0.1 + h3.quantile(0.01, 1.0, 1617.0) shouldEqual 1.0 +- 0.1 // must use the starting reference from min } it("should serialize to and from BinaryHistograms and compare correctly") { diff --git a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala index 76dfaeb0ad..5e9b852c08 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala @@ -5,7 +5,7 @@ import java.time.{Instant, LocalDateTime, YearMonth, ZoneId, ZoneOffset} import spire.syntax.cfor._ import filodb.core.query.ResultSchema -import filodb.memory.format.vectors.{Histogram, MaxMinHistogram, MutableHistogram} +import filodb.memory.format.vectors.Histogram import filodb.query.InstantFunctionId import filodb.query.InstantFunctionId._ @@ -114,7 +114,8 @@ object InstantFunction { if (sourceSchema.isHistMaxMin) HistogramQuantileWithMaxMinImpl() else HistogramQuantileImpl() case HistogramMaxQuantile => HistogramMaxQuantileImpl() case HistogramBucket => HistogramBucketImpl() - case HistogramFraction => HistogramFractionImpl() + case HistogramFraction => + if (sourceSchema.isHistMaxMin) HistogramFractionWithMaxMinImpl() else HistogramFractionImpl() case _ => throw new UnsupportedOperationException(s"$function not supported.") } } @@ -374,6 +375,19 @@ final case class HistogramFractionImpl() extends HistToDoubleIFunction { } } +/** + * Histogram quantile function for Histogram columns, where all buckets are together. This will take in consideration + * of min and max columns + */ +final case class HistogramFractionWithMaxMinImpl() extends HMaxMinToDoubleIFunction { + final def apply(value: Histogram, max: Double, min: Double, scalarParams: Seq[Double]): Double = { + require(scalarParams.length == 2, "Need two params for histogram fraction function") + val lower = scalarParams(0) + val upper = scalarParams(1) + value.histogramFraction(lower, upper, min, max) + } +} + /** * Histogram quantile function for Histogram columns, where all buckets are together. This will take in consideration * of min and max columns @@ -381,11 +395,7 @@ final case class HistogramFractionImpl() extends HistToDoubleIFunction { final case class HistogramQuantileWithMaxMinImpl() extends HMaxMinToDoubleIFunction { final def apply(value: Histogram, max: Double, min: Double, scalarParams: Seq[Double]): Double = { require(scalarParams.length == 1, "Quantile (between 0 and 1) required for histogram quantile") - val maxMinHist = value match { - case h: MutableHistogram => MaxMinHistogram(h, max, min) - case other: Histogram => MaxMinHistogram(MutableHistogram(other), max, min) - } - maxMinHist.quantile(scalarParams(0)) + value.quantile(scalarParams(0), min, max) } } @@ -398,11 +408,7 @@ final case class HistogramMaxQuantileImpl() extends HMaxMinToDoubleIFunction { */ final def apply(hist: Histogram, max: Double, min: Double = Double.NaN, scalarParams: Seq[Double]): Double = { require(scalarParams.length == 1, "Quantile (between 0 and 1) required for histogram quantile") - val maxHist = hist match { - case h: MutableHistogram => MaxMinHistogram(h, max, 0d) - case other: Histogram => MaxMinHistogram(MutableHistogram(other), max, 0d) - } - maxHist.quantile(scalarParams(0)) + hist.quantile(scalarParams(0), 0, max) } } From d1db5deb4caddda08a067c21c0aa9f01b6b02e38 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Wed, 20 Nov 2024 09:20:43 -0800 Subject: [PATCH 2/8] improve min-max quantile with more tests --- .../format/vectors/Histogram.scala | 45 +++++++++-------- .../format/vectors/HistogramTest.scala | 48 ++++++++++++++++++- 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 6331c7bf21..ba068bbae8 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -63,29 +63,35 @@ trait Histogram extends Ordered[Histogram] { * Calculates histogram quantile based on bucket values using Prometheus scheme (increasing/LE) */ def quantile(q: Double, - min: Double = Double.NegativeInfinity, + min: Double = 0, // negative observations not supported yet max: Double = Double.PositiveInfinity): Double = { val result = if (q < 0) Double.NegativeInfinity else if (q > 1) Double.PositiveInfinity - else if (numBuckets < 2) Double.NaN + else if (numBuckets < 2 || topBucketValue <= 0) Double.NaN else { // find rank for the quantile using total number of occurrences (which is the last bucket value) var rank = q * topBucketValue // using rank, find the le bucket which would have the identified rank - val b = firstBucketGTE(rank) + val bucket = firstBucketGTE(rank) + + // current bucket lower and upper bound; negative observations not supported yet - to be done later + var bucketStart = if (bucket == 0) 0 else bucketTop(bucket-1) + var bucketEnd = bucketTop(bucket) + // if min and max are in this bucket, adjust the bucket start and end + if (min > bucketStart && min <= bucketEnd) bucketStart = min + if (max > bucketStart && max <= bucketEnd) bucketEnd = max // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. - if (b == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) return bucketTop(numBuckets-2) - else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0) - else { - // interpolate quantile within le bucket - var (bucketStart, bucketEnd, count) = (Math.max(0d, min), Math.min(bucketTop(b), max), bucketValue(b)) - if (b > 0) { - bucketStart = bucketTop(b-1) - count -= bucketValue(b-1) - rank -= bucketValue(b-1) - } + if (bucket == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) { + return bucketTop(bucket-1) + } else if (bucket == 0 && bucketTop(0) <= 0) { + return bucketTop(0) // zero or negative bucket + } else { + + // interpolate quantile within boundaries of "bucket" + val count = if (bucket == 0) bucketValue(bucket) else bucketValue(bucket) - bucketValue(bucket-1) + rank -= (if (bucket == 0) 0 else bucketValue(bucket-1)) val fraction = rank/count if (!hasExponentialBuckets || bucketStart == 0) { bucketStart + (bucketEnd-bucketStart) * fraction @@ -135,9 +141,9 @@ trait Histogram extends Ordered[Histogram] { val b = it.next() val zeroBucket = (b == 0) val bucketUpper = bucketTop(b) - val bucketLower = if (b == 0) 0.0 else bucketTop(b - 1) + val bucketLower = if (zeroBucket) 0.0 else bucketTop(b - 1) val bucketVal = bucketValue(b) - val prevBucketVal = if (b == 0) 0.0 else bucketValue(b - 1) + val prevBucketVal = if (zeroBucket) 0.0 else bucketValue(b - 1) // Define interpolation functions def interpolateLinearly(v: Double): Double = { @@ -699,11 +705,12 @@ final case class Base2ExpHistogramBuckets(scale: Int, val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop) var newScale = Math.min(scale, o.scale) var newBase = Math.max(base, o.base) - var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 - var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 + // minus one below since there is "+1" in `bucket(index) = base ^ (index + 1)` + var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 // exclusive + var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 // inclusive // Even if the two schemes are of same scale, they can have non-overlapping bucket ranges. // The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits. - while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) { + while (newBucketIndexEnd - newBucketIndexStart + 1 > maxBuckets) { newScale -= 1 newBase = Math.pow(2, Math.pow(2, -newScale)) newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 @@ -714,7 +721,7 @@ final case class Base2ExpHistogramBuckets(scale: Int, } /** - * Converts an OTel exponential index to array index. + * Converts an OTel exponential index to array index (aka bucket no). * For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then * -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10. * Know that 0 array index is reserved for zero bucket. diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index 8b8b699724..91cfc66d25 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -103,7 +103,6 @@ class HistogramTest extends NativeVectorTest { customHistograms(0).quantile(0.95) shouldEqual 10 } - it("should calculate quantile correctly for exponential bucket histograms") { val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) @@ -117,6 +116,51 @@ class HistogramTest extends NativeVectorTest { hist.quantile(0.085) shouldEqual 0.014142135623730961 +- 0.00001 } + it("quantile for exponential histogram with real data should match expected") { + val bucketScheme = Base2ExpHistogramBuckets(3, -78, 126) + val str = "0.0=0.0, 0.0012664448775888738=0.0, 0.0013810679320049727=0.0, 0.001506065259187439=0.0, " + + "0.0016423758110424079=0.0, 0.0017910235218841198=0.0, 0.001953124999999996=0.0, 0.002129897915361827=0.0, " + + "0.002322670146489685=0.0, 0.0025328897551777484=0.0, 0.002762135864009946=0.0, 0.0030121305183748786=0.0, " + + "0.0032847516220848166=0.0, 0.0035820470437682404=0.0, 0.003906249999999993=0.0, 0.004259795830723655=0.0, " + + "0.004645340292979371=0.0, 0.005065779510355498=0.0, 0.005524271728019893=0.0, 0.006024261036749759=0.0, " + + "0.006569503244169634=0.0, 0.007164094087536483=0.0, 0.007812499999999988=0.0, 0.008519591661447312=0.0, " + + "0.009290680585958744=0.0, 0.010131559020710997=0.0, 0.011048543456039788=0.0, 0.012048522073499521=0.0, " + + "0.013139006488339272=0.0, 0.014328188175072969=0.0, 0.01562499999999998=0.0, 0.017039183322894627=0.0, " + + "0.018581361171917492=0.0, 0.020263118041422=0.0, 0.022097086912079584=0.0, 0.024097044146999046=0.0, " + + "0.026278012976678547=0.0, 0.028656376350145944=0.0, 0.031249999999999965=0.0, 0.03407836664578927=0.0, " + + "0.037162722343835=0.0, 0.04052623608284401=0.0, 0.044194173824159175=0.0, 0.0481940882939981=0.0, " + + "0.05255602595335711=0.0, 0.0573127527002919=0.0, 0.062499999999999944=0.0, 0.06815673329157855=0.0, " + + "0.07432544468767001=0.0, 0.08105247216568803=0.0, 0.08838834764831838=0.0, 0.09638817658799623=0.0, " + + "0.10511205190671424=0.0, 0.11462550540058382=0.0, 0.12499999999999992=0.0, 0.13631346658315713=1.0, " + + "0.14865088937534005=1.0, 0.16210494433137612=1.0, 0.17677669529663678=1.0, 0.1927763531759925=1.0, " + + "0.21022410381342854=1.0, 0.2292510108011677=1.0, 0.2499999999999999=2.0, 0.2726269331663143=2.0, " + + "0.29730177875068015=3.0, 0.3242098886627523=3.0, 0.3535533905932736=3.0, 0.3855527063519851=3.0, " + + "0.42044820762685714=4.0, 0.4585020216023355=5.0, 0.4999999999999999=5.0, 0.5452538663326287=5.0, " + + "0.5946035575013604=6.0, 0.6484197773255047=6.0, 0.7071067811865475=8.0, 0.7711054127039704=8.0, " + + "0.8408964152537145=9.0, 0.9170040432046712=9.0, 1.0=11.0, 1.0905077326652577=12.0, 1.189207115002721=14.0, " + + "1.2968395546510099=15.0, 1.4142135623730951=17.0, 1.542210825407941=19.0, 1.6817928305074294=20.0, " + + "1.8340080864093429=22.0, 2.0000000000000004=23.0, 2.181015465330516=26.0, 2.378414230005443=28.0, " + + "2.59367910930202=31.0, 2.8284271247461907=34.0, 3.084421650815883=37.0, 3.3635856610148593=41.0, " + + "3.6680161728186866=45.0, 4.000000000000002=48.0, 4.3620309306610325=53.0, 4.756828460010887=58.0, " + + "5.187358218604041=64.0, 5.656854249492383=70.0, 6.168843301631767=76.0, 6.7271713220297205=84.0, " + + "7.336032345637374=90.0, 8.000000000000005=99.0, 8.724061861322067=108.0, 9.513656920021775=118.0, " + + "10.374716437208086=129.0, 11.31370849898477=140.0, 12.337686603263537=152.0, 13.454342644059444=167.0, " + + "14.672064691274752=182.0, 16.000000000000014=199.0, 17.448123722644137=217.0, 19.027313840043554=237.0, " + + "20.749432874416176=258.0, 22.627416997969544=282.0, 24.675373206527077=308.0, 26.908685288118896=336.0, " + + "29.34412938254951=367.0, 32.000000000000036=400.0, 34.89624744528828=435.0, 38.05462768008712=474.0, " + + "41.49886574883236=517.0, 45.254833995939094=565.0, 49.35074641305417=617.0, 53.8173705762378=672.0, " + + "58.688258765099036=732.0, 64.00000000000009=749.0" + val counts = str.split(", ").map { s => + val kv = s.split("=") + kv(1).toDouble + } + val hist = MutableHistogram(bucketScheme, counts) + hist.quantile(0.5) shouldEqual 29.927691427444305 +- 0.00001 + hist.quantile(0.99) shouldEqual 61.602904581469566 +- 0.00001 + hist.quantile(0.01) shouldEqual 0.6916552392692796 +- 0.00001 + hist.quantile(0.99, min=0.03, max=59.87) shouldEqual 59.34643429268522 +- 0.00001 + } + it("should calculate histogram_fraction correctly for exponential histograms using exponential interpolation") { val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) @@ -194,6 +238,8 @@ class HistogramTest extends NativeVectorTest { val values3 = Array[Double](1, 1, 1, 1, 1, 4, 7, 7, 9, 9) ++ Array.fill(54)(12.0) val h3 = MutableHistogram(HistogramBuckets.binaryBuckets64, values3) + h3.quantile(0.99) shouldEqual 2006.0399999999995 +- 0.0001 // without min/max + h3.quantile(0.99, 0, 0) shouldEqual 2006.0399999999995 +- 0.0001 // with potentially wrong min max h3.quantile(0.99, 1.0, 1617.0) shouldEqual 1593.2 +- 0.1 h3.quantile(0.90, 1.0, 1617.0) shouldEqual 1379.4 +- 0.1 h3.quantile(0.01, 1.0, 1617.0) shouldEqual 1.0 +- 0.1 // must use the starting reference from min From 4aa3420f13854b1b8ac79f149f372708af393b60 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Thu, 21 Nov 2024 15:30:31 +0530 Subject: [PATCH 3/8] add SumAndMinMaxOverTimeH range function --- .../filodb/coordinator/ProtoConverters.scala | 2 + grpc/src/main/protobuf/query_service.proto | 1 + .../query/exec/InternalRangeFunction.scala | 2 + .../exec/MultiSchemaPartitionsExec.scala | 8 ++-- .../exec/rangefn/AggrOverTimeFunctions.scala | 44 +++++++++++++++++++ .../query/exec/rangefn/RangeFunction.scala | 3 ++ 6 files changed, 56 insertions(+), 4 deletions(-) diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index 32355df907..d182e01778 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -980,6 +980,7 @@ object ProtoConverters { case InternalRangeFunction.AvgWithSumAndCountOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME + case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME @@ -1017,6 +1018,7 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME => InternalRangeFunction.AvgWithSumAndCountOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime + case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index ea58539491..499c887f4d 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -641,6 +641,7 @@ enum InternalRangeFunction { ABSENT_OVER_TIME = 26; PRESENT_OVER_TIME = 27; MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28; + RATE_AND_MIN_MAX_OVER_TIME = 29; } enum SortFunctionId { diff --git a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala index d622d60f1a..210c88bb30 100644 --- a/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/InternalRangeFunction.scala @@ -4,6 +4,7 @@ import enumeratum.EnumEntry import filodb.query.RangeFunctionId +// scalastyle:off number.of.types // Used for internal representations of RangeFunctions sealed abstract class InternalRangeFunction(val onCumulCounter: Boolean = false) extends EnumEntry @@ -55,6 +56,7 @@ object InternalRangeFunction { // Used only for histogram schemas with max column case object SumAndMaxOverTime extends InternalRangeFunction + case object RateAndMinMaxOverTime extends InternalRangeFunction case object LastSampleHistMaxMin extends InternalRangeFunction case object Timestamp extends InternalRangeFunction diff --git a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala index 8e8f5b4412..155c337d55 100644 --- a/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala +++ b/query/src/main/scala/filodb/query/exec/MultiSchemaPartitionsExec.scala @@ -128,10 +128,10 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext, // This code is responsible for putting exact IDs needed by any range functions. val colIDs1 = getColumnIDs(sch, newColName.toSeq, rangeVectorTransformers) - val colIDs = isMaxMinColumnsEnabled(maxMinTenantFilter) match { - case true => addIDsForHistMaxMin(sch, colIDs1) - case _ => colIDs1 - } + val colIDs = if (sch.data.columns.exists(_.name == "min") && + sch.data.columns.exists(_.name == "max") && + isMaxMinColumnsEnabled(maxMinTenantFilter)) addIDsForHistMaxMin(sch, colIDs1) + else colIDs1 // Modify transformers as needed for histogram w/ max, downsample, other schemas val newxformers1 = newXFormersForDownsample(sch, rangeVectorTransformers) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index b434484aa6..1dd0ad02bf 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -366,6 +366,50 @@ class SumAndMaxOverTimeFuncHD(maxColID: Int) extends ChunkedRangeFunction[Transi } } +class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedRangeFunction[TransientHistMaxMinRow] { + private val hFunc = new RateOverDeltaChunkedFunctionH + private val maxFunc = new MaxOverTimeChunkedFunctionD + private val minFunc = new MinOverTimeChunkedFunctionD + + override final def reset(): Unit = { + hFunc.reset() + maxFunc.reset() + minFunc.reset() + } + final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = { + sampleToEmit.setValues(endTimestamp, hFunc.h) + sampleToEmit.setDouble(2, maxFunc.max) + sampleToEmit.setDouble(3, minFunc.min) + } + + import BinaryVector.BinaryVectorPtr + + // scalastyle:off parameter.number + def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader, + valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, + startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { + // Do BinarySearch for start/end pos only once for both columns == WIN! + val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff + val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) + + // At least one sample is present + if (startRowNum <= endRowNum) { + hFunc.addTimeChunks(valueVectorAcc, valueVector, valueReader, startRowNum, endRowNum) + + // Get valueVector/reader for max column + val maxVectAcc = info.vectorAccessor(maxColId) + val maxVectPtr = info.vectorAddress(maxColId) + maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum) + + // Get valueVector/reader for max column + val minVectAcc = info.vectorAccessor(minColId) + val minVectPtr = info.vectorAddress(minColId) + minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum) + } + } +} + + /** * Computes Average Over Time using sum and count columns. * Used in when calculating avg_over_time using downsampled data diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala index aecf277106..e63ceb1210 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RangeFunction.scala @@ -372,6 +372,7 @@ object RangeFunction { f match { case None => Some(LastSampleHistMaxMin) case Some(SumOverTime) => Some(SumAndMaxOverTime) + case Some(Rate) => Some(RateAndMinMaxOverTime) case other => other } @@ -383,6 +384,8 @@ object RangeFunction { () => new LastSampleChunkedFunctionHMax(schema.colIDs(2), schema.colIDs(3)) case Some(SumAndMaxOverTime) => require(schema.columns(2).name == "max") () => new SumAndMaxOverTimeFuncHD(schema.colIDs(2)) + case Some(RateAndMinMaxOverTime) => require(schema.columns(2).name == "max" && schema.columns(3).name == "min") + () => new RateAndMinMaxOverTimeFuncHD(schema.colIDs(2), schema.colIDs(3)) case Some(Last) => () => new LastSampleChunkedFunctionH case Some(SumOverTime) => () => new SumOverTimeChunkedFunctionH case Some(Rate) if schema.columns(1).isCumulative From bd17c10aaa7fecb5cf94bf7747cf52b9d126d32d Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Sat, 23 Nov 2024 06:15:57 +0530 Subject: [PATCH 4/8] bug fix RateAndMinMaxOverTimeH range function --- .../filodb/query/exec/rangefn/AggrOverTimeFunctions.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index 1dd0ad02bf..9ee5dff507 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -376,11 +376,13 @@ class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedR maxFunc.reset() minFunc.reset() } - final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = { - sampleToEmit.setValues(endTimestamp, hFunc.h) + + override def apply(windowStart: Long, windowEnd: Long, sampleToEmit: TransientHistMaxMinRow): Unit = { + hFunc.apply(windowStart, windowEnd, sampleToEmit) sampleToEmit.setDouble(2, maxFunc.max) sampleToEmit.setDouble(3, minFunc.min) } + final def apply(endTimestamp: Long, sampleToEmit: TransientHistMaxMinRow): Unit = ??? // should not be invoked import BinaryVector.BinaryVectorPtr From 1b04f677c0ac2e3838f91339d1d161808c68c058 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Sun, 24 Nov 2024 21:00:54 +0530 Subject: [PATCH 5/8] unit test --- .../format/vectors/Histogram.scala | 2 +- .../format/vectors/HistogramTest.scala | 36 ++++++++++++++++++- .../rangefn/AggrOverTimeFunctionsSpec.scala | 34 ++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index ba068bbae8..b24128c123 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -84,7 +84,7 @@ trait Histogram extends Ordered[Histogram] { // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. if (bucket == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) { - return bucketTop(bucket-1) + return bucketTop(numBuckets-2) } else if (bucket == 0 && bucketTop(0) <= 0) { return bucketTop(0) // zero or negative bucket } else { diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index 91cfc66d25..0d322a1a13 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -11,6 +11,14 @@ object HistogramTest { Array[Double](11, 16, 26, 27, 33, 42, 46, 55), Array[Double](4, 4, 5, 33, 35, 67, 91, 121) ) + + val rawHistBuckets2 = Seq( + Array[Double](0, 25, 17, 20, 25, 34, 76, 82), + Array[Double](0, 22, 26, 26, 36, 38, 56, 59), + Array[Double](0, 27, 26, 27, 33, 42, 46, 55), + Array[Double](0, 8, 5, 33, 35, 67, 91, 121) + ) + val rawLongBuckets = rawHistBuckets.map(_.map(_.toLong)) val mutableHistograms = rawHistBuckets.map { buckets => // Create a new copy here, so that mutations don't affect original array @@ -26,7 +34,7 @@ object HistogramTest { } val otelExpBuckets = Base2ExpHistogramBuckets(3, -3, 7) - val otelExpHistograms = rawHistBuckets.map { buckets => + val otelExpHistograms = rawHistBuckets2.map { buckets => LongHistogram(otelExpBuckets, buckets.take(otelExpBuckets.numBuckets).map(_.toLong)) } @@ -245,6 +253,32 @@ class HistogramTest extends NativeVectorTest { h3.quantile(0.01, 1.0, 1617.0) shouldEqual 1.0 +- 0.1 // must use the starting reference from min } + it("should calculate more accurate quantile for otel exponential histogram using max and min column") { + // bucket boundaries for scale 3, range -3 to 3, 7 buckets + // zeroBucket: 0.0 -3: 0.840896, -2: 0.917004, -1: 1.000000, 0; 1.090508, 1: 1.189207, 2: 1.296840, 3: 1.414214 + val exp95thWithoutMinMax = Seq(1.3329136609345256, 1.2987136172035367, 1.3772644080792311, 1.3897175222720994) + otelExpHistograms.map { h => + h.quantile(0.95) + }.zip(exp95thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the max now, unlike before + val exp95thWithMinMax = Seq(1.2978395301558558, 1.296892165727062, 1.2990334920692943, 1.2993620241156902) + otelExpHistograms.map { h => + h.quantile(0.95, 0.78, 1.3) // notice 1.3 max is less than last bucket + }.zip(exp95thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + val exp5thWithoutMinMax = Seq(0.13790701210160922, 0.11275656477265718, 0.08564685710917462, 0.6359279140356217) + otelExpHistograms.map { h => + h.quantile(0.05) + }.zip(exp5thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + + // notice how the quantiles are calculated within the min now, unlike before + val exp5thWithMinMax = Seq(0.7896758526873408, 0.7879023377662122, 0.7859951235344664, 0.825628309567315) + otelExpHistograms.map { h => + h.quantile(0.05, 0.78, 1.3) // notice 0.78 min is less than first non-zero bucket, but bigger than previous otel bucket if it existed + }.zip(exp5thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + } + it("should serialize to and from BinaryHistograms and compare correctly") { val binHistograms = mutableHistograms.map { h => val buf = new ExpandableArrayBuffer() diff --git a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala index 0a71367863..d5479e09d9 100644 --- a/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala +++ b/query/src/test/scala/filodb/query/exec/rangefn/AggrOverTimeFunctionsSpec.scala @@ -17,6 +17,7 @@ import filodb.memory._ import filodb.memory.data.ChunkMap import filodb.memory.format.{TupleRowReader, vectors => bv} import filodb.memory.BinaryRegion.NativePointer +import filodb.memory.format.vectors.MutableHistogram import filodb.query.exec._ /** @@ -313,6 +314,39 @@ class AggrOverTimeFunctionsSpec extends RawDataWindowingSpec { } } + // Goal of this is to verify histogram rate functionality for diff time windows. + // See PeriodicSampleMapperSpec for verifying integration of histograms with min and max + it("should aggregate both max/min and hist for rate when min/max is in schema") { + val (data, rv) = MMD.histMaxMinRV(defaultStartTS, pubFreq, 150, 8) + (0 until numIterations).foreach { x => + val windowSize = rand.nextInt(50) + 10 + val step = rand.nextInt(50) + 5 + info(s"iteration $x windowSize=$windowSize step=$step") + + val row = new TransientHistMaxMinRow() + val chunkedIt = chunkedWindowItHist(data, rv, new RateAndMinMaxOverTimeFuncHD(5, 4), windowSize, step, row) + chunkedIt.zip(data.sliding(windowSize, step)).foreach { case (aggRow, rawDataWindow) => + val aggHist = aggRow.getHistogram(1) + val sumRawHist = rawDataWindow.map(_(3).asInstanceOf[bv.LongHistogram]) + .foldLeft(emptyAggHist) { case (agg, h) => agg.add(h); agg } + aggHist.asInstanceOf[MutableHistogram].values // what we got + .zip(sumRawHist.values.map(_/(windowSize-1)/10)) // expected + .foreach { case (a, b) => + a shouldEqual b +- 0.0000001 + } + + val maxMax = rawDataWindow.map(_(5).asInstanceOf[Double]) + .foldLeft(0.0) { case (agg, m) => Math.max(agg, m) } + aggRow.getDouble(2) shouldEqual maxMax + + val minMin = rawDataWindow.map(_(4).asInstanceOf[Double]) + .foldLeft(Double.MaxValue) { case (agg, m) => Math.min(agg, m) } + aggRow.getDouble(3) shouldEqual minMin + + } + } + } + it("should correctly aggregate min_over_time / max_over_time using both chunked and sliding iterators") { val data = (1 to 240).map(_.toDouble) val chunkSize = 40 From aff7f80a637f06ffa4f34681f3e544363173f505 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Mon, 25 Nov 2024 17:49:28 +0530 Subject: [PATCH 6/8] fix unit test --- .../format/vectors/HistogramTest.scala | 28 ++++----- .../format/vectors/HistogramVectorTest.scala | 57 +++++++++++++++++++ 2 files changed, 72 insertions(+), 13 deletions(-) diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index 0d322a1a13..13af94706b 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -13,10 +13,10 @@ object HistogramTest { ) val rawHistBuckets2 = Seq( - Array[Double](0, 25, 17, 20, 25, 34, 76, 82), - Array[Double](0, 22, 26, 26, 36, 38, 56, 59), - Array[Double](0, 27, 26, 27, 33, 42, 46, 55), - Array[Double](0, 8, 5, 33, 35, 67, 91, 121) + Array[Double](0, 15, 17, 20, 25, 34, 76, 82), + Array[Double](0, 16, 26, 26, 36, 38, 56, 59), + Array[Double](0, 16, 26, 27, 33, 42, 46, 55), + Array[Double](0, 4, 5, 33, 35, 67, 91, 121) ) val rawLongBuckets = rawHistBuckets.map(_.map(_.toLong)) @@ -256,27 +256,29 @@ class HistogramTest extends NativeVectorTest { it("should calculate more accurate quantile for otel exponential histogram using max and min column") { // bucket boundaries for scale 3, range -3 to 3, 7 buckets // zeroBucket: 0.0 -3: 0.840896, -2: 0.917004, -1: 1.000000, 0; 1.090508, 1: 1.189207, 2: 1.296840, 3: 1.414214 - val exp95thWithoutMinMax = Seq(1.3329136609345256, 1.2987136172035367, 1.3772644080792311, 1.3897175222720994) + val expected95thWithoutMinMax = Seq(1.3329136609345256, 1.2987136172035367, 1.3772644080792311, 1.3897175222720994) otelExpHistograms.map { h => h.quantile(0.95) - }.zip(exp95thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + }.zip(expected95thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } // notice how the quantiles are calculated within the max now, unlike before - val exp95thWithMinMax = Seq(1.2978395301558558, 1.296892165727062, 1.2990334920692943, 1.2993620241156902) + val expected95thWithMinMax = Seq(1.2978395301558558, 1.296892165727062, 1.2990334920692943, 1.2993620241156902) otelExpHistograms.map { h => h.quantile(0.95, 0.78, 1.3) // notice 1.3 max is less than last bucket - }.zip(exp95thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + }.zip(expected95thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } - val exp5thWithoutMinMax = Seq(0.13790701210160922, 0.11275656477265718, 0.08564685710917462, 0.6359279140356217) + val expected5thWithoutMinMax = Seq(0.22984502016934866, 0.15504027656240363, 0.14452907137173218, 0.9199883517494387) otelExpHistograms.map { h => h.quantile(0.05) - }.zip(exp5thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + }.zip(expected5thWithoutMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } // notice how the quantiles are calculated within the min now, unlike before - val exp5thWithMinMax = Seq(0.7896758526873408, 0.7879023377662122, 0.7859951235344664, 0.825628309567315) + val expected5thWithMinMax = Seq(0.7961930120386448, 0.7908863115573832, 0.7901434789531481, 0.9199883517494387) otelExpHistograms.map { h => - h.quantile(0.05, 0.78, 1.3) // notice 0.78 min is less than first non-zero bucket, but bigger than previous otel bucket if it existed - }.zip(exp5thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } + // notice 0.78 min is less than first non-zero bucketTop, + // but bigger than previous otel bucketTop if it existed + h.quantile(0.05, 0.78, 1.3) + }.zip(expected5thWithMinMax).foreach { case (q, e) => q shouldEqual e +- 0.0001 } } it("should serialize to and from BinaryHistograms and compare correctly") { diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala index 58ab343f25..e194a025aa 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala @@ -87,6 +87,63 @@ class HistogramVectorTest extends NativeVectorTest { } } + // This test confirms that we can store 1-minutely exponential histogram samples of 160 buckets + // easily in a histogram appender / write buffer, and we can still hold samples for 1-hr flush interval + it("should be able to store sufficient hist samples in one 15k byte vector, the histogram appender default size") { + val str = "0.0=0.0, 0.0012664448775888738=0.0, 0.0013810679320049727=0.0, 0.001506065259187439=0.0, " + + "0.0016423758110424079=0.0, 0.0017910235218841198=0.0, 0.001953124999999996=0.0, 0.002129897915361827=0.0, " + + "0.002322670146489685=0.0, 0.0025328897551777484=0.0, 0.002762135864009946=0.0, 0.0030121305183748786=0.0, " + + "0.0032847516220848166=0.0, 0.0035820470437682404=0.0, 0.003906249999999993=0.0, 0.004259795830723655=0.0, " + + "0.004645340292979371=0.0, 0.005065779510355498=0.0, 0.005524271728019893=0.0, 0.006024261036749759=0.0, " + + "0.006569503244169634=0.0, 0.007164094087536483=0.0, 0.007812499999999988=0.0, 0.008519591661447312=0.0, " + + "0.009290680585958744=0.0, 0.010131559020710997=0.0, 0.011048543456039788=0.0, 0.012048522073499521=0.0, " + + "0.013139006488339272=0.0, 0.014328188175072969=0.0, 0.01562499999999998=0.0, 0.017039183322894627=0.0, " + + "0.018581361171917492=0.0, 0.020263118041422=0.0, 0.022097086912079584=0.0, 0.024097044146999046=0.0, " + + "0.026278012976678547=0.0, 0.028656376350145944=0.0, 0.031249999999999965=0.0, 0.03407836664578927=0.0, " + + "0.037162722343835=0.0, 0.04052623608284401=0.0, 0.044194173824159175=0.0, 0.0481940882939981=0.0, " + + "0.05255602595335711=0.0, 0.0573127527002919=0.0, 0.062499999999999944=0.0, 0.06815673329157855=0.0, " + + "0.07432544468767001=0.0, 0.08105247216568803=0.0, 0.08838834764831838=0.0, 0.09638817658799623=0.0, " + + "0.10511205190671424=0.0, 0.11462550540058382=0.0, 0.12499999999999992=0.0, 0.13631346658315713=1.0, " + + "0.14865088937534005=1.0, 0.16210494433137612=1.0, 0.17677669529663678=1.0, 0.1927763531759925=1.0, " + + "0.21022410381342854=1.0, 0.2292510108011677=1.0, 0.2499999999999999=2.0, 0.2726269331663143=2.0, " + + "0.29730177875068015=3.0, 0.3242098886627523=3.0, 0.3535533905932736=3.0, 0.3855527063519851=3.0, " + + "0.42044820762685714=4.0, 0.4585020216023355=5.0, 0.4999999999999999=5.0, 0.5452538663326287=5.0, " + + "0.5946035575013604=6.0, 0.6484197773255047=6.0, 0.7071067811865475=8.0, 0.7711054127039704=8.0, " + + "0.8408964152537145=9.0, 0.9170040432046712=9.0, 1.0=11.0, 1.0905077326652577=12.0, 1.189207115002721=14.0, " + + "1.2968395546510099=15.0, 1.4142135623730951=17.0, 1.542210825407941=19.0, 1.6817928305074294=20.0, " + + "1.8340080864093429=22.0, 2.0000000000000004=23.0, 2.181015465330516=26.0, 2.378414230005443=28.0, " + + "2.59367910930202=31.0, 2.8284271247461907=34.0, 3.084421650815883=37.0, 3.3635856610148593=41.0, " + + "3.6680161728186866=45.0, 4.000000000000002=48.0, 4.3620309306610325=53.0, 4.756828460010887=58.0, " + + "5.187358218604041=64.0, 5.656854249492383=70.0, 6.168843301631767=76.0, 6.7271713220297205=84.0, " + + "7.336032345637374=90.0, 8.000000000000005=99.0, 8.724061861322067=108.0, 9.513656920021775=118.0, " + + "10.374716437208086=129.0, 11.31370849898477=140.0, 12.337686603263537=152.0, 13.454342644059444=167.0, " + + "14.672064691274752=182.0, 16.000000000000014=199.0, 17.448123722644137=217.0, 19.027313840043554=237.0, " + + "20.749432874416176=258.0, 22.627416997969544=282.0, 24.675373206527077=308.0, 26.908685288118896=336.0, " + + "29.34412938254951=367.0, 32.000000000000036=400.0, 34.89624744528828=435.0, 38.05462768008712=474.0, " + + "41.49886574883236=517.0, 45.254833995939094=565.0, 49.35074641305417=617.0, 53.8173705762378=672.0, " + + "58.688258765099036=732.0, 64.00000000000009=749.0" + + val appender = HistogramVector.appending(memFactory, 15000) // 15k bytes is default blob size + val bucketScheme = Base2ExpHistogramBuckets(3, -78, 126) + var counts = str.split(", ").map { s => + val kv = s.split("=") + kv(1).toDouble.toLong + } + + (0 until 206).foreach { buckets => + val hist = LongHistogram(bucketScheme, counts) + hist.serialize(Some(buffer)) + if (buckets < 205) appender.addData(buffer) shouldEqual Ack + // should fail for 206th histogram because it crosses the size of write buffer + if (buckets >= 205) appender.addData(buffer) shouldEqual VectorTooSmall(73,46) + counts = counts.map(_ + 10) + } + + val reader = appender.reader.asInstanceOf[RowHistogramReader] + reader.length shouldEqual 205 + + } + it("should accept MutableHistograms with otel exp scheme and query them back") { val appender = HistogramVector.appending(memFactory, 1024) val mutHistograms = otelExpHistograms.map(h => MutableHistogram(h.buckets, h.valueArray)) From 82a40ca10e6df683d5f769702293e94c071f16ac Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Thu, 28 Nov 2024 14:43:56 +0530 Subject: [PATCH 7/8] jmh fixes --- .../Base2ExponentialHistogramQueryBenchmark.scala | 13 ++++++++----- .../filodb.memory/format/vectors/Histogram.scala | 3 ++- run_benchmarks.sh | 12 +++++++++--- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala index b00038cf07..03a35c745f 100644 --- a/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala @@ -34,7 +34,7 @@ import filodb.timeseries.TestTimeseriesProducer */ @State(Scope.Thread) class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { - org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.DEBUG) + org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.WARN) import filodb.coordinator._ import client.Client.{actorAsk, asyncAsk} @@ -46,7 +46,7 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { // TODO: move setup and ingestion to another trait val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf") - .withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("30MB"))) + .withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("300MB"))) private val cluster = FilodbCluster(system) cluster.join() @@ -90,11 +90,12 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { val (producingFut, containerStream) = TestTimeseriesProducer.metricsToContainerStream(startTime, numShards, numSeries, numMetricNames = 1, numSamplesPerTs * numSeries, dataset, shardMapper, spread, publishIntervalSec = 10, numBuckets = numBuckets, expHist = true) + val endTime = startTime + (numSamplesPerTs * 10000) val ingestTask = containerStream.groupBy(_._1) // Asynchronously subcribe and ingest each shard .mapParallelUnordered(numShards) { groupedStream => val shard = groupedStream.key - println(s"Starting ingest exp histograms on shard $shard...") + println(s"Starting ingest exp histograms on shard $shard from timestamp $startTime to $endTime") val shardStream = groupedStream.zipWithIndex.flatMap { case ((_, bytes), idx) => val data = bytes.map { array => SomeData(RecordContainer(array), idx) } Observable.fromIterable(data) @@ -117,13 +118,15 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { val histQuantileQuery = """histogram_quantile(0.7, sum(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m])))""" val queries = Seq(histQuantileQuery) - val queryTime = startTime + (7 * 60 * 1000) // 5 minutes from start until 60 minutes from start + val queryStartTime = startTime/1000 + (5 * 60) // 5 minutes from start until 60 minutes from start val queryStep = 120 // # of seconds between each query sample "step" - val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) + val queryEndTime = queryStartTime + queryIntervalMin*60 + val qParams = TimeStepParams(queryStartTime, queryStep, queryEndTime) val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } val queryCommands = logicalPlans.map { plan => LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000)) } + println(s"Querying data from $queryStartTime to $queryEndTime") var queriesSucceeded = 0 var queriesFailed = 0 diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index b24128c123..53a803a0c6 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -594,7 +594,8 @@ final case class GeometricBuckets(firstBucket: Double, object Base2ExpHistogramBuckets { // TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here - val maxBuckets = 200 + // see PR for benchmark test results based on which maxBuckets was fixed. Dont increase without analysis. + val maxBuckets = 180 val maxAbsScale = 100 val maxAbsBucketIndex = 500 } diff --git a/run_benchmarks.sh b/run_benchmarks.sh index 458898ca25..2432976a5a 100755 --- a/run_benchmarks.sh +++ b/run_benchmarks.sh @@ -1,6 +1,11 @@ #!/bin/bash -sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 \ - -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 -jvmArgsAppend -Dkamon.enabled=false \ + +sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 2 -wi 2 -f 1 \ + -jvmArgsAppend -Dlogback.configurationFile=../conf/logback-perf.xml + -jvmArgsAppend -XX:MaxInlineLevel=20 \ + -jvmArgsAppend -Xmx4g \ + -jvmArgsAppend -XX:MaxInlineSize=99 \ + -jvmArgsAppend -Dkamon.enabled=false \ filodb.jmh.Base2ExponentialHistogramQueryBenchmark \ filodb.jmh.QueryHiCardInMemoryBenchmark \ filodb.jmh.QueryInMemoryBenchmark \ @@ -11,4 +16,5 @@ sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -X filodb.jmh.PartKeyLuceneIndexBenchmark \ filodb.jmh.PartKeyTantivyIndexBenchmark" -# -prof 'async:libPath=/path/to/async-profiler-3.0-macos/lib/libasyncProfiler.dylib;event=cpu;output=flamegraphdir=./profile-results' \ \ No newline at end of file +# Add below argument to enable profiling +# -prof \"async:libPath=/path/to/async-profiler-3.0-macos/lib/libasyncProfiler.dylib;event=cpu;output=flamegraph;dir=./profile-results\" \ From 17812e28ba22f3158337545bebd128113aa54a05 Mon Sep 17 00:00:00 2001 From: Vish Ramachandran Date: Tue, 3 Dec 2024 09:05:14 +0530 Subject: [PATCH 8/8] PR comments --- .../main/scala/filodb.memory/format/vectors/Histogram.scala | 2 +- .../filodb/query/exec/rangefn/AggrOverTimeFunctions.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 53a803a0c6..d1ba20463e 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -83,7 +83,7 @@ trait Histogram extends Ordered[Histogram] { // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. - if (bucket == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) { + if (bucket == numBuckets-1 && bucketEnd.isPosInfinity) { return bucketTop(numBuckets-2) } else if (bucket == 0 && bucketTop(0) <= 0) { return bucketTop(0) // zero or negative bucket diff --git a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala index 9ee5dff507..ff39cbff21 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/AggrOverTimeFunctions.scala @@ -390,7 +390,7 @@ class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedR def addChunks(tsVectorAcc: MemoryReader, tsVector: BinaryVectorPtr, tsReader: bv.LongVectorDataReader, valueVectorAcc: MemoryReader, valueVector: BinaryVectorPtr, valueReader: VectorDataReader, startTime: Long, endTime: Long, info: ChunkSetInfoReader, queryConfig: QueryConfig): Unit = { - // Do BinarySearch for start/end pos only once for both columns == WIN! + // Do BinarySearch for start/end pos only once for all columns == WIN! val startRowNum = tsReader.binarySearch(tsVectorAcc, tsVector, startTime) & 0x7fffffff val endRowNum = Math.min(tsReader.ceilingIndex(tsVectorAcc, tsVector, endTime), info.numRows - 1) @@ -403,7 +403,7 @@ class RateAndMinMaxOverTimeFuncHD(maxColId: Int, minColId: Int) extends ChunkedR val maxVectPtr = info.vectorAddress(maxColId) maxFunc.addTimeChunks(maxVectAcc, maxVectPtr, bv.DoubleVector(maxVectAcc, maxVectPtr), startRowNum, endRowNum) - // Get valueVector/reader for max column + // Get valueVector/reader for min column val minVectAcc = info.vectorAccessor(minColId) val minVectPtr = info.vectorAddress(minColId) minFunc.addTimeChunks(minVectAcc, minVectPtr, bv.DoubleVector(minVectAcc, minVectPtr), startRowNum, endRowNum)