Skip to content

Commit

Permalink
feat(histogram): Adding support for min max for prom and delta histog…
Browse files Browse the repository at this point in the history
…rams (#1712)

* Adding Min Max Histogram Schemas
  • Loading branch information
sandeep6189 authored Feb 20, 2024
1 parent 7aa80af commit feb8a6e
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 12 deletions.
27 changes: 27 additions & 0 deletions core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,33 @@ filodb {
downsample-schema = "delta-histogram"
}

otel-histogram {
columns = ["timestamp:ts",
"sum:double:detectDrops=true",
"count:double:detectDrops=true",
"min:double:detectDrops=true",
"max:double:detectDrops=true",
"h:hist:counter=true"]
value-column = "h"
downsamplers = ["tTime(0)", "dLast(1)", "dLast(2)", "dMin(3)", "dMax(4)", "hLast(5)"]
# Downsample periods are determined by counter dips of the count column
downsample-period-marker = "counter(2)"
downsample-schema = "otel-histogram"
}

delta-histogram-min-max {
columns = ["timestamp:ts",
"sum:double:{detectDrops=false,delta=true}",
"count:double:{detectDrops=false,delta=true}",
"min:double:{detectDrops=false,delta=true}",
"max:double:{detectDrops=false,delta=true}",
"h:hist:{counter=false,delta=true}"]
value-column = "h"
downsamplers = ["tTime(0)", "dSum(1)", "dSum(2)", "dMin(3)", "dMax(4)", "hSum(5)"]
downsample-period-marker = "time(0)"
downsample-schema = "delta-histogram-min-max"
}

# PreAgg Schema defintions.
# preagg-gauge and preagg-delta-counter have same schemas. Defining two different data-schemas to enable
# additional features/functionalities at query-time. preaggregated delta-histogram data has same schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ object RecordBuilder {
* Removes the ignoreShardKeyColumnSuffixes from LabelPair as configured in DataSet.
*
* Few metric types like Histogram, Summary exposes multiple time
* series for the same metric during a scrape by appending suffixes _bucket,_sum,_count.
* series for the same metric during a scrape by appending suffixes _bucket,_sum,_count
*
* In order to ingest all these multiple time series of a single metric to the
* same shard, we have to trim the suffixes while calculating shardKeyHash.
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ object Schemas extends StrictLogging {
val untyped = global.schemas("untyped")
val promHistogram = global.schemas("prom-histogram")
val deltaHistogram = global.schemas("delta-histogram")
val otelHistogram = global.schemas("otel-histogram")
val deltaHistogramMinMax = global.schemas("delta-histogram-min-max")
val preaggDeltaHistogram = global.schemas("preagg-delta-histogram")
val dsGauge = global.schemas("ds-gauge")
val preaggGauge = global.schemas("preagg-gauge")
Expand Down
19 changes: 16 additions & 3 deletions gateway/src/main/scala/filodb/gateway/GatewayServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import org.rogach.scallop._
import filodb.coordinator.{FilodbSettings, ShardMapper, StoreFactory}
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.Dataset
import filodb.core.metadata.Schemas.{deltaCounter, deltaHistogram, gauge, promHistogram}
import filodb.core.metadata.Schemas.{deltaCounter, deltaHistogram, deltaHistogramMinMax, gauge, otelHistogram,
promHistogram}
import filodb.gateway.conversion._
import filodb.memory.MemFactory
import filodb.timeseries.TestTimeseriesProducer
Expand Down Expand Up @@ -79,6 +80,10 @@ object GatewayServer extends StrictLogging {
val sourceConfigPath = trailArg[String](descr = "Path to source config, eg conf/timeseries-dev-source.conf")
val genHistData = toggle(noshort = true, descrYes = "Generate prom-histogram-schema test data and exit")
val genDeltaHistData = toggle(noshort = true, descrYes = "Generate delta-histogram-schema test data and exit")
val genOtelHistData = toggle(noshort = true,
descrYes = "Generate otel-histogram schema test data and exit")
val genDeltaHistMinMaxData = toggle(noshort = true,
descrYes = "Generate delta-histogram-min-max-schema test data and exit")
val genGaugeData = toggle(noshort = true, descrYes = "Generate Prometheus gauge-schema test data and exit")
val genCounterData = toggle(noshort = true, descrYes = "Generate Prometheus counter-schema test data and exit")
val genDeltaCounterData = toggle(noshort = true, descrYes = "Generate delta-counter-schema test data and exit")
Expand Down Expand Up @@ -133,12 +138,19 @@ object GatewayServer extends StrictLogging {
val genDeltaHist = userOpts.genDeltaHistData.getOrElse(false)
val genCounterData = userOpts.genCounterData.getOrElse(false)
val genDeltaCounterData = userOpts.genDeltaCounterData.getOrElse(false)
val genOtelHistData = userOpts.genOtelHistData.getOrElse(false)
val genDeltaHistMinMaxData = userOpts.genDeltaHistMinMaxData.getOrElse(false)

if (genHist || genGaugeData || genDeltaHist
|| genCounterData || genDeltaCounterData) {
|| genCounterData || genDeltaCounterData || genDeltaHistMinMaxData || genOtelHistData) {
val startTime = System.currentTimeMillis
logger.info(s"Generating $numSamples samples starting at $startTime....")

val stream = if (genHist) TestTimeseriesProducer.genHistogramData(startTime, numSeries, promHistogram)
else if (genOtelHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
otelHistogram)
else if (genDeltaHistMinMaxData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
deltaHistogramMinMax)
else if (genDeltaHist) TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram)
else if (genGaugeData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries,
userOpts.numMetrics(), userOpts.publishIntervalSecs(), gauge)
Expand All @@ -158,7 +170,8 @@ object GatewayServer extends StrictLogging {
}
Thread sleep 10000
TestTimeseriesProducer.logQueryHelp(dataset.name, userOpts.numMetrics(), numSamples, numSeries,
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, userOpts.publishIntervalSecs())
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelHistData, genDeltaHistMinMaxData,
userOpts.publishIntervalSecs())
logger.info(s"Waited for containers to be sent, exiting...")
sys.exit(0)
} else {
Expand Down
98 changes: 98 additions & 0 deletions gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,55 @@ object InputRecord {
writeKVRecord(builder, metric, tags, timestamp, value, untyped)


/**
* Writes an otel-style increasing histogram record, along with the sum, count, min and max
*/
def writeOtelHistRecord(builder: RecordBuilder,
metric: String,
tags: Map[String, String],
timestamp: Long,
kvs: Seq[(String, Double)]): Unit = {
var sum = Double.NaN
var count = Double.NaN
var min = Double.NaN
var max = Double.NaN

// Filter out sum and count, then convert and sort buckets
val sortedBuckets = kvs.filter {
case ("sum", v) => sum = v
false
case ("count", v) => count = v
false
case ("min", v) => min = v
false
case ("max", v) => max = v
false
case other => true
}.map {
case ("+Inf", v) => (Double.PositiveInfinity, v.toLong)
case (k, v) => (k.toDouble, v.toLong)
}.sorted

if (sortedBuckets.nonEmpty) {
// Built up custom histogram objects and scheme, then encode
val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray)
val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray)

// Now, write out histogram
builder.startNewRecord(otelHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addDouble(min)
builder.addDouble(max)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}

/**
* Writes a Prometheus-style increasing histogram record, along with the sum and count,
* using the efficient prom-histogram schema, storing the entire histogram together for efficiency.
Expand Down Expand Up @@ -132,6 +181,55 @@ object InputRecord {
}
}

/**
* Writes a delta non-increasing histogram record, along with the sum, count, min and max
*/
def writeDeltaHistRecordMinMax(builder: RecordBuilder,
metric: String,
tags: Map[String, String],
timestamp: Long,
kvs: Seq[(String, Double)]): Unit = {
var sum = Double.NaN
var count = Double.NaN
var min = Double.NaN
var max = Double.NaN

// Filter out sum and count, then convert and sort buckets
val sortedBuckets = kvs.filter {
case ("sum", v) => sum = v
false
case ("count", v) => count = v
false
case ("min", v) => min = v
false
case ("max", v) => max = v
false
case other => true
}.map {
case ("+Inf", v) => (Double.PositiveInfinity, v.toLong)
case (k, v) => (k.toDouble, v.toLong)
}.sorted

if (sortedBuckets.nonEmpty) {
// Built up custom histogram objects and scheme, then encode
val buckets = CustomBuckets(sortedBuckets.map(_._1).toArray)
val hist = LongHistogram(buckets, sortedBuckets.map(_._2).toArray)

// Now, write out histogram
builder.startNewRecord(deltaHistogramMinMax)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addDouble(min)
builder.addDouble(max)
builder.addBlob(hist.serialize())

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}

/**
* Writes a delta non-increasing histogram record, along with the sum and count,
* using the delta-histogram schema, storing the entire histogram together for efficiency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ object TestTimeseriesProducer extends StrictLogging {
//scalastyle:off method.length parameter.number
def logQueryHelp(dataset: String, numMetrics: Int, numSamples: Int, numTimeSeries: Int, startTimeMs: Long,
genHist: Boolean, genDeltaHist: Boolean, genGauge: Boolean,
genPromCounter: Boolean, publishIntervalSec: Int): Unit = {
genPromCounter: Boolean, genOtelHist: Boolean, genDeltaHistMinMax: Boolean,
publishIntervalSec: Int): Unit = {
val startQuery = startTimeMs / 1000
val endQuery = startQuery + (numSamples / numMetrics / numTimeSeries) * publishIntervalSec
logger.info(s"Finished producing $numSamples records for ${(endQuery-startQuery).toDouble/60} minutes")

val metricName = if (genGauge) "heap_usage0"
else if (genHist) "http_request_latency"
else if (genDeltaHist) "http_request_latency_delta"
else if (genHist || genOtelHist) "http_request_latency"
else if (genDeltaHist || genDeltaHistMinMax) "http_request_latency_delta"
else if (genPromCounter) "heap_usage_counter0"
else "heap_usage_delta0"

Expand Down Expand Up @@ -206,7 +207,7 @@ object TestTimeseriesProducer extends StrictLogging {
val numBuckets = 10
val histBucketScheme = bv.GeometricBuckets(2.0, 3.0, numBuckets)
var buckets = new Array[Long](numBuckets)
val metric = if (Schemas.deltaHistogram == histSchema) {
val metric = if (Schemas.deltaHistogram == histSchema || Schemas.deltaHistogramMinMax == histSchema) {
"http_request_latency_delta"
} else {
"http_request_latency"
Expand All @@ -228,7 +229,8 @@ object TestTimeseriesProducer extends StrictLogging {
val host = (instance >> 4) & twoBitMask
val timestamp = startTime + (n.toLong / numTimeSeries) * 10000 // generate 1 sample every 10s for each instance
// reset buckets for delta histograms
if (Schemas.deltaHistogram == histSchema && prevTimestamp != timestamp) {
if ( (Schemas.deltaHistogram == histSchema || Schemas.deltaHistogramMinMax == histSchema )
&& prevTimestamp != timestamp) {
prevTimestamp = timestamp
buckets = new Array[Long](numBuckets)
}
Expand All @@ -244,7 +246,14 @@ object TestTimeseriesProducer extends StrictLogging {
hostUTF8 -> s"H$host".utf8,
instUTF8 -> s"Instance-$instance".utf8)

new MetricTagInputRecord(Seq(timestamp, sum, count, hist), metric, tags, histSchema)
if (histSchema == Schemas.deltaHistogramMinMax || histSchema == Schemas.otelHistogram) {
val minVal = buckets.min.toDouble
val maxVal = buckets.max.toDouble
new MetricTagInputRecord(Seq(timestamp, sum, count, minVal, maxVal, hist), metric, tags, histSchema)
}
else {
new MetricTagInputRecord(Seq(timestamp, sum, count, hist), metric, tags, histSchema)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.scalatest.matchers.should.Matchers

class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
val builder = new RecordBuilder(MemFactory.onHeapFactory)
val builder2 = new RecordBuilder(MemFactory.onHeapFactory)

val baseTags = Map("dataset" -> "timeseries",
"host" -> "MacBook-Pro-229.local",
Expand All @@ -19,7 +20,10 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
val counts = Array(10L, 20L, 25, 38, 50, 66)
val sum = counts.sum.toDouble
val count = 50.0
val min = counts.min.toDouble
val max = counts.max.toDouble
val sumCountKVs = Seq("sum" -> sum, "count" -> count)
val sumCountMinMaxKVs = Seq("sum" -> sum, "count" -> count, "min" -> min, "max" -> max)

it("should writePromHistRecord to BR and be able to deserialize it") {
val buckets = Array(0.5, 1, 2.5, 5, 10, Double.PositiveInfinity)
Expand Down Expand Up @@ -55,6 +59,44 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
}
}

it("should writeDeltaHistRecordMinMax to BR and be able to deserialize it") {
val buckets = Array(0.5, 1, 2.5, 5, 10, Double.PositiveInfinity)
val expected = LongHistogram(CustomBuckets(buckets), counts)

val bucketKVs = buckets.zip(counts).map {
case (Double.PositiveInfinity, c) => "+Inf" -> c.toDouble
case (b, c) => b.toString -> c.toDouble
}.toSeq
// 1 - sum/count at end
InputRecord.writeDeltaHistRecordMinMax(builder2, metric, baseTags, 100000L, bucketKVs ++ sumCountMinMaxKVs)
builder2.allContainers.head.iterate(Schemas.deltaHistogramMinMax.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(3) shouldEqual min
row.getDouble(4) shouldEqual max
row.getHistogram(5) shouldEqual expected
}
}

it("should otelHistogram to BR and be able to deserialize it") {
val buckets = Array(0.5, 1, 2.5, 5, 10, Double.PositiveInfinity)
val expected = LongHistogram(CustomBuckets(buckets), counts)

val bucketKVs = buckets.zip(counts).map {
case (Double.PositiveInfinity, c) => "+Inf" -> c.toDouble
case (b, c) => b.toString -> c.toDouble
}.toSeq
// 1 - sum/count at end
InputRecord.writeOtelHistRecord(builder2, metric, baseTags, 100000L, bucketKVs ++ sumCountMinMaxKVs)
builder2.allContainers.head.iterate(Schemas.otelHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(3) shouldEqual min
row.getDouble(4) shouldEqual max
row.getHistogram(5) shouldEqual expected
}
}

it("should skip empty histograms via writePromHistRecord, and write subsequent records") {
builder.reset()
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,19 @@ final case class MultiSchemaPartitionsExec(queryContext: QueryContext,

/*
* As part of Histogram query compatibility with Prometheus format histograms, we
* remove _sum & _count suffix from metric name here. _bucket & le are already removed in SingleClusterPlanner.
* We remove the suffix only when partition lookup does not return any results
* remove _sum, _count, _min, _max suffix from metric name here. _bucket & le are already
* removed in SingleClusterPlanner. We remove the suffix only when partition lookup does not return any results
*/
if (lookupRes.firstSchemaId.isEmpty && querySession.queryConfig.translatePromToFilodbHistogram &&
colName.isEmpty && metricName.isDefined) {
val res = if (metricName.get.endsWith("_sum"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "sum", source, querySession)
else if (metricName.get.endsWith("_count"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "count", source, querySession)
else if (metricName.get.endsWith("_min"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "min", source, querySession)
else if (metricName.get.endsWith("_max"))
removeSuffixAndGenerateLookupResult(filters, metricName.get, "max", source, querySession)
else (lookupRes, newColName)

lookupRes = res._1
Expand Down

0 comments on commit feb8a6e

Please sign in to comment.