Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Min-max versions of histogram quantile and fraction for exp histograms #1891

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,7 @@ object ProtoConverters {
case InternalRangeFunction.AvgWithSumAndCountOverTime =>
GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME
case InternalRangeFunction.SumAndMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME
case InternalRangeFunction.RateAndMinMaxOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME
case InternalRangeFunction.LastSampleHistMaxMin => GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN
case InternalRangeFunction.Timestamp => GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP
case InternalRangeFunction.AbsentOverTime => GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME
Expand Down Expand Up @@ -1017,6 +1018,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InternalRangeFunction.AVG_WITH_SUM_AND_COUNT_OVER_TIME =>
InternalRangeFunction.AvgWithSumAndCountOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.SUM_AND_MAX_OVER_TIME => InternalRangeFunction.SumAndMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.RATE_AND_MIN_MAX_OVER_TIME => InternalRangeFunction.RateAndMinMaxOverTime
case GrpcMultiPartitionQueryService.InternalRangeFunction.LAST_SAMPLE_HIST_MAX_MIN => InternalRangeFunction.LastSampleHistMaxMin
case GrpcMultiPartitionQueryService.InternalRangeFunction.TIME_STAMP => InternalRangeFunction.Timestamp
case GrpcMultiPartitionQueryService.InternalRangeFunction.ABSENT_OVER_TIME => InternalRangeFunction.AbsentOverTime
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ enum InternalRangeFunction {
ABSENT_OVER_TIME = 26;
PRESENT_OVER_TIME = 27;
MEDIAN_ABSOLUTE_DEVIATION_OVER_TIME = 28;
RATE_AND_MIN_MAX_OVER_TIME = 29;
}

enum SortFunctionId {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import filodb.timeseries.TestTimeseriesProducer
*/
@State(Scope.Thread)
class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.DEBUG)
org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.WARN)

import filodb.coordinator._
import client.Client.{actorAsk, asyncAsk}
Expand All @@ -46,7 +46,7 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {

// TODO: move setup and ingestion to another trait
val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf")
.withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("30MB")))
.withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("300MB")))

private val cluster = FilodbCluster(system)
cluster.join()
Expand Down Expand Up @@ -90,11 +90,12 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
val (producingFut, containerStream) = TestTimeseriesProducer.metricsToContainerStream(startTime, numShards, numSeries,
numMetricNames = 1, numSamplesPerTs * numSeries, dataset, shardMapper, spread,
publishIntervalSec = 10, numBuckets = numBuckets, expHist = true)
val endTime = startTime + (numSamplesPerTs * 10000)
val ingestTask = containerStream.groupBy(_._1)
// Asynchronously subcribe and ingest each shard
.mapParallelUnordered(numShards) { groupedStream =>
val shard = groupedStream.key
println(s"Starting ingest exp histograms on shard $shard...")
println(s"Starting ingest exp histograms on shard $shard from timestamp $startTime to $endTime")
val shardStream = groupedStream.zipWithIndex.flatMap { case ((_, bytes), idx) =>
val data = bytes.map { array => SomeData(RecordContainer(array), idx) }
Observable.fromIterable(data)
Expand All @@ -117,13 +118,15 @@ class Base2ExponentialHistogramQueryBenchmark extends StrictLogging {
val histQuantileQuery =
"""histogram_quantile(0.7, sum(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m])))"""
val queries = Seq(histQuantileQuery)
val queryTime = startTime + (7 * 60 * 1000) // 5 minutes from start until 60 minutes from start
val queryStartTime = startTime/1000 + (5 * 60) // 5 minutes from start until 60 minutes from start
val queryStep = 120 // # of seconds between each query sample "step"
val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60)
val queryEndTime = queryStartTime + queryIntervalMin*60
val qParams = TimeStepParams(queryStartTime, queryStep, queryEndTime)
val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) }
val queryCommands = logicalPlans.map { plan =>
LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000))
}
println(s"Querying data from $queryStartTime to $queryEndTime")

var queriesSucceeded = 0
var queriesFailed = 0
Expand Down
99 changes: 30 additions & 69 deletions memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,36 @@ trait Histogram extends Ordered[Histogram] {
/**
* Calculates histogram quantile based on bucket values using Prometheus scheme (increasing/LE)
*/
def quantile(q: Double): Double = {
def quantile(q: Double,
min: Double = 0, // negative observations not supported yet
max: Double = Double.PositiveInfinity): Double = {
val result = if (q < 0) Double.NegativeInfinity
else if (q > 1) Double.PositiveInfinity
else if (numBuckets < 2) Double.NaN
else if (numBuckets < 2 || topBucketValue <= 0) Double.NaN
vishramachandran marked this conversation as resolved.
Show resolved Hide resolved
else {
// find rank for the quantile using total number of occurrences (which is the last bucket value)
var rank = q * topBucketValue
// using rank, find the le bucket which would have the identified rank
val b = firstBucketGTE(rank)
val bucket = firstBucketGTE(rank)

// current bucket lower and upper bound; negative observations not supported yet - to be done later
var bucketStart = if (bucket == 0) 0 else bucketTop(bucket-1)
var bucketEnd = bucketTop(bucket)
// if min and max are in this bucket, adjust the bucket start and end
if (min > bucketStart && min <= bucketEnd) bucketStart = min
if (max > bucketStart && max <= bucketEnd) bucketEnd = max

// now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top
// as we cannot interpolate to +Inf.
if (b == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) return bucketTop(numBuckets-2)
else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0)
else {
// interpolate quantile within le bucket
var (bucketStart, bucketEnd, count) = (0d, bucketTop(b), bucketValue(b))
if (b > 0) {
bucketStart = bucketTop(b-1)
count -= bucketValue(b-1)
rank -= bucketValue(b-1)
}
if (bucket == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) {
return bucketTop(numBuckets-2)
vishramachandran marked this conversation as resolved.
Show resolved Hide resolved
} else if (bucket == 0 && bucketTop(0) <= 0) {
return bucketTop(0) // zero or negative bucket
vishramachandran marked this conversation as resolved.
Show resolved Hide resolved
} else {

// interpolate quantile within boundaries of "bucket"
val count = if (bucket == 0) bucketValue(bucket) else bucketValue(bucket) - bucketValue(bucket-1)
rank -= (if (bucket == 0) 0 else bucketValue(bucket-1))
val fraction = rank/count
if (!hasExponentialBuckets || bucketStart == 0) {
bucketStart + (bucketEnd-bucketStart) * fraction
Expand Down Expand Up @@ -133,9 +141,9 @@ trait Histogram extends Ordered[Histogram] {
val b = it.next()
val zeroBucket = (b == 0)
val bucketUpper = bucketTop(b)
val bucketLower = if (b == 0) 0.0 else bucketTop(b - 1)
val bucketLower = if (zeroBucket) 0.0 else bucketTop(b - 1)
val bucketVal = bucketValue(b)
val prevBucketVal = if (b == 0) 0.0 else bucketValue(b - 1)
val prevBucketVal = if (zeroBucket) 0.0 else bucketValue(b - 1)

// Define interpolation functions
def interpolateLinearly(v: Double): Double = {
Expand Down Expand Up @@ -264,7 +272,6 @@ final case class LongHistogram(var buckets: HistogramBuckets, var values: Array[
}
// TODO if otel histogram, the need to add values in a different way
// see if we can refactor since MutableHistogram also has this logic
assert(other.buckets == buckets)
cforRange { 0 until numBuckets } { b =>
values(b) += other.values(b)
}
Expand Down Expand Up @@ -433,54 +440,6 @@ object MutableHistogram {
}
}

/**
* MaxMinHistogram improves quantile calculation accuracy with a known max value recorded from the client.
* Whereas normally Prom histograms have +Inf as the highest bucket, and we cannot interpolate above the last
* non-Inf bucket, having a max allows us to interpolate from the rank up to the max.
* When the max value is lower, we interpolate between the bottom of the bucket and the max value.
* Both changes mean that the 0.90+ quantiles return much closer to the max value, instead of interpolating or clipping.
* The quantile result can never be above max, regardless of the bucket scheme.
*
* UPDATE: Adding support for min value as well to make sure the quantile is never below the minimum specified value.
* By default, the minimum value is set to 0.0
*/
final case class MaxMinHistogram(innerHist: HistogramWithBuckets, max: Double, min: Double = 0.0)
extends HistogramWithBuckets {
final def buckets: HistogramBuckets = innerHist.buckets
final def bucketValue(no: Int): Double = innerHist.bucketValue(no)

def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ???

override def quantile(q: Double): Double = {
val result = if (q < 0) Double.NegativeInfinity
else if (q > 1) Double.PositiveInfinity
else if (numBuckets < 2) Double.NaN
else {
// find rank for the quantile using total number of occurrences (which is the last bucket value)
var rank = q * topBucketValue
// using rank, find the le bucket which would have the identified rank
val bucketNum = firstBucketGTE(rank)

// now calculate quantile. No need to special case top bucket since we will always cap top at max
if (bucketNum == 0 && bucketTop(0) <= 0) return bucketTop(0)
else {
// interpolate quantile within le bucket
var (bucketStart, bucketEnd, count) = (Math.max(0d, min),
Math.min(bucketTop(bucketNum), max), bucketValue(bucketNum))
if (bucketNum > 0) {
bucketStart = bucketTop(bucketNum-1)
count -= bucketValue(bucketNum-1)
rank -= bucketValue(bucketNum-1)
}
bucketStart + (bucketEnd-bucketStart)*(rank/count)
}
}
result
}


}

/**
* A scheme for buckets in a histogram. Since these are Prometheus-style histograms,
* each bucket definition consists of occurrences of numbers which are less than or equal to the bucketTop
Expand Down Expand Up @@ -635,7 +594,8 @@ final case class GeometricBuckets(firstBucket: Double,

object Base2ExpHistogramBuckets {
// TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here
val maxBuckets = 200
// see PR for benchmark test results based on which maxBuckets was fixed. Dont increase without analysis.
val maxBuckets = 180
val maxAbsScale = 100
val maxAbsBucketIndex = 500
}
Expand Down Expand Up @@ -746,11 +706,12 @@ final case class Base2ExpHistogramBuckets(scale: Int,
val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop)
var newScale = Math.min(scale, o.scale)
var newBase = Math.max(base, o.base)
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1
// minus one below since there is "+1" in `bucket(index) = base ^ (index + 1)`
var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 // exclusive
var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 // inclusive
// Even if the two schemes are of same scale, they can have non-overlapping bucket ranges.
// The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits.
while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) {
while (newBucketIndexEnd - newBucketIndexStart + 1 > maxBuckets) {
newScale -= 1
newBase = Math.pow(2, Math.pow(2, -newScale))
newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1
Expand All @@ -761,7 +722,7 @@ final case class Base2ExpHistogramBuckets(scale: Int,
}

/**
* Converts an OTel exponential index to array index.
* Converts an OTel exponential index to array index (aka bucket no).
* For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then
* -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10.
* Know that 0 array index is reserved for zero bucket.
Expand Down
Loading
Loading