Skip to content

Commit

Permalink
feat(core): Min-max versions of histogram quantile and fraction for e…
Browse files Browse the repository at this point in the history
…xp histograms (#1891)

* Use min and max when available to calculate better quantiles and fractions.
* Fix perf benchmarks with updated results
* Calculate rate correctly along with min and max when doing sum(rate(histogram))
  • Loading branch information
vishramachandran authored Dec 3, 2024
1 parent a39f612 commit 4ec845a
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ object ProtoConverters {
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1017,6 +1018,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ enum InternalRangeFunction {
ABSENT_OVER_TIME = 26;
PRESENT_OVER_TIME = 27;
MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28;
RATE_AND_MIN_MAX_OVER_TIME = 29;
}

enum SortFunctionId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import filodb.timeseries.TestTimeseriesProducer
*/
@State(Scope.Thread)
class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.DEBUG)
org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.WARN)

import filodb.coordinator._
import client.Client.{actorAsk, asyncAsk}
Expand All @@ -46,7 +46,7 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {

// TODO: move setup and ingestion to another trait
val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf")
.withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("30MB")))
.withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("300MB")))

private val cluster = FilodbCluster(system)
cluster.join()
Expand Down Expand Up @@ -90,11 +90,12 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
val (producingFut, containerStream) = TestTimeseriesProducer.metricsToContainerStream(startTime, numShards, numSeries,
numMetricNames = 1, numSamplesPerTs * numSeries, dataset, shardMapper, spread,
publishIntervalSec = 10, numBuckets = numBuckets, expHist = true)
val endTime = startTime + (numSamplesPerTs * 10000)
val ingestTask = containerStream.groupBy(_._1)
// Asynchronously subcribe and ingest each shard
.mapParallelUnordered(numShards) { groupedStream =>
val shard = groupedStream.key
println(s"Starting ingest exp histograms on shard $shard...")
println(s"Starting ingest exp histograms on shard $shard from timestamp $startTime to $endTime")
val shardStream = groupedStream.zipWithIndex.flatMap { case ((_, bytes), idx) =>
val data = bytes.map { array => SomeData(RecordContainer(array), idx) }
Observable.fromIterable(data)
Expand All @@ -117,13 +118,15 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
val histQuantileQuery =
"""histogram_quantile(0.7, sum(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m])))"""
val queries = Seq(histQuantileQuery)
val queryTime = startTime + (7 * 60 * 1000) // 5 minutes from start until 60 minutes from start
val queryStartTime = startTime/1000 + (5 * 60) // 5 minutes from start until 60 minutes from start
val queryStep = 120 // # of seconds between each query sample "step"
val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60)
val queryEndTime = queryStartTime + queryIntervalMin*60
val qParams = TimeStepParams(queryStartTime, queryStep, queryEndTime)
val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) }
val queryCommands = logicalPlans.map { plan =>
LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000))
}
println(s"Querying data from $queryStartTime to $queryEndTime")

var queriesSucceeded = 0
var queriesFailed = 0
Expand Down
99 changes: 30 additions & 69 deletions memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,36 @@ trait Histogram extends Ordered[Histogram] {
/**
* Calculates histogram quantile based on bucket values using Prometheus scheme (increasing/LE)
*/
def quantile(q: Double): Double = {
def quantile(q: Double,
min: Double = 0, // negative observations not supported yet
max: Double = Double.PositiveInfinity): Double = {
val result = if (q < 0) Double.NegativeInfinity
else if (q > 1) Double.PositiveInfinity
else if (numBuckets < 2) Double.NaN
else if (numBuckets < 2 || topBucketValue <= 0) Double.NaN
else {
// find rank for the quantile using total number of occurrences (which is the last bucket value)
var rank = q * topBucketValue
// using rank, find the le bucket which would have the identified rank
val b = firstBucketGTE(rank)
val bucket = firstBucketGTE(rank)

// current bucket lower and upper bound; negative observations not supported yet - to be done later
var bucketStart = if (bucket == 0) 0 else bucketTop(bucket-1)
var bucketEnd = bucketTop(bucket)
// if min and max are in this bucket, adjust the bucket start and end
if (min > bucketStart && min <= bucketEnd) bucketStart = min
if (max > bucketStart && max <= bucketEnd) bucketEnd = max

// now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top
// as we cannot interpolate to +Inf.
if (b == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) return bucketTop(numBuckets-2)
else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0)
else {
// interpolate quantile within le bucket
var (bucketStart, bucketEnd, count) = (0d, bucketTop(b), bucketValue(b))
if (b > 0) {
bucketStart = bucketTop(b-1)
count -= bucketValue(b-1)
rank -= bucketValue(b-1)
}
if (bucket == numBuckets-1 && bucketEnd.isPosInfinity) {
return bucketTop(numBuckets-2)
} else if (bucket == 0 && bucketTop(0) <= 0) {
return bucketTop(0) // zero or negative bucket
} else {

// interpolate quantile within boundaries of "bucket"
val count = if (bucket == 0) bucketValue(bucket) else bucketValue(bucket) - bucketValue(bucket-1)
rank -= (if (bucket == 0) 0 else bucketValue(bucket-1))
val fraction = rank/count
if (!hasExponentialBuckets || bucketStart == 0) {
bucketStart + (bucketEnd-bucketStart) * fraction
Expand Down Expand Up @@ -133,9 +141,9 @@ trait Histogram extends Ordered[Histogram] {
val b = it.next()
val zeroBucket = (b == 0)
val bucketUpper = bucketTop(b)
val bucketLower = if (b == 0) 0.0 else bucketTop(b - 1)
val bucketLower = if (zeroBucket) 0.0 else bucketTop(b - 1)
val bucketVal = bucketValue(b)
val prevBucketVal = if (b == 0) 0.0 else bucketValue(b - 1)
val prevBucketVal = if (zeroBucket) 0.0 else bucketValue(b - 1)

// Define interpolation functions
def interpolateLinearly(v: Double): Double = {
Expand Down Expand Up @@ -264,7 +272,6 @@ final case class LongHistogram(var buckets: HistogramBuckets, var values: Array[
}
// TODO if otel histogram, the need to add values in a different way
// see if we can refactor since MutableHistogram also has this logic
assert(other.buckets == buckets)
cforRange { 0 until numBuckets } { b =>
values(b) += other.values(b)
}
Expand Down Expand Up @@ -433,54 +440,6 @@ object MutableHistogram {
}
}

/**
* MaxMinHistogram improves quantile calculation accuracy with a known max value recorded from the client.
* Whereas normally Prom histograms have +Inf as the highest bucket, and we cannot interpolate above the last
* non-Inf bucket, having a max allows us to interpolate from the rank up to the max.
* When the max value is lower, we interpolate between the bottom of the bucket and the max value.
* Both changes mean that the 0.90+ quantiles return much closer to the max value, instead of interpolating or clipping.
* The quantile result can never be above max, regardless of the bucket scheme.
*
* UPDATE: Adding support for min value as well to make sure the quantile is never below the minimum specified value.
* By default, the minimum value is set to 0.0
*/
final case class MaxMinHistogram(innerHist: HistogramWithBuckets, max: Double, min: Double = 0.0)
extends HistogramWithBuckets {
final def buckets: HistogramBuckets = innerHist.buckets
final def bucketValue(no: Int): Double = innerHist.bucketValue(no)

def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ???

override def quantile(q: Double): Double = {
val result = if (q < 0) Double.NegativeInfinity
else if (q > 1) Double.PositiveInfinity
else if (numBuckets < 2) Double.NaN
else {
// find rank for the quantile using total number of occurrences (which is the last bucket value)
var rank = q * topBucketValue
// using rank, find the le bucket which would have the identified rank
val bucketNum = firstBucketGTE(rank)

// now calculate quantile. No need to special case top bucket since we will always cap top at max
if (bucketNum == 0 && bucketTop(0) <= 0) return bucketTop(0)
else {
// interpolate quantile within le bucket
var (bucketStart, bucketEnd, count) = (Math.max(0d, min),
Math.min(bucketTop(bucketNum), max), bucketValue(bucketNum))
if (bucketNum > 0) {
bucketStart = bucketTop(bucketNum-1)
count -= bucketValue(bucketNum-1)
rank -= bucketValue(bucketNum-1)
}
bucketStart + (bucketEnd-bucketStart)*(rank/count)
}
}
result
}


}

/**
* A scheme for buckets in a histogram. Since these are Prometheus-style histograms,
* each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop
Expand Down Expand Up @@ -635,7 +594,8 @@ final case class GeometricBuckets(firstBucket: Double,

object Base2ExpHistogramBuckets {
// TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here
val maxBuckets = 200
// see PR for benchmark test results based on which maxBuckets was fixed. Dont increase without analysis.
val maxBuckets = 180
val maxAbsScale = 100
val maxAbsBucketIndex = 500
}
Expand Down Expand Up @@ -746,11 +706,12 @@ final case class Base2ExpHistogramBuckets(scale: Int,
val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop)
var newScale = Math.min(scale, o.scale)
var newBase = Math.max(base, o.base)
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1
// minus one below since there is "+1" in `bucket(index) = base ^ (index + 1)`
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 // exclusive
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 // inclusive
// Even if the two schemes are of same scale, they can have non-overlapping bucket ranges.
// The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits.
while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) {
while (newBucketIndexEnd - newBucketIndexStart + 1 > maxBuckets) {
newScale -= 1
newBase = Math.pow(2, Math.pow(2, -newScale))
newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
Expand All @@ -761,7 +722,7 @@ final case class Base2ExpHistogramBuckets(scale: Int,
}

/**
* Converts an OTel exponential index to array index.
* Converts an OTel exponential index to array index (aka bucket no).
* For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then
* -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10.
* Know that 0 array index is reserved for zero bucket.
Expand Down
Loading

0 comments on commit 4ec845a

Please sign in to comment.