Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): OTel Exponential Histograms #1879

Merged
merged 24 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
81499c4
OTel Histogram Buckets
vishramachandran Oct 24, 2024
a304c11
remove array copy
vishramachandran Oct 24, 2024
ae0e48c
ser deser working
vishramachandran Oct 24, 2024
18186d0
Downsample Main Spec working for OTel histograms
vishramachandran Oct 27, 2024
47c041f
Limit number of buckets when adding histograms
vishramachandran Oct 28, 2024
4f7dacd
Synthetic generation of OTel Exp buckets
vishramachandran Oct 28, 2024
5caf5a5
queries working well
vishramachandran Oct 30, 2024
d029e12
call out TODOs for cumulative otel exp histograms
vishramachandran Oct 30, 2024
bb82bc3
Zero count; numBuckets instead of endBucket, negative buckets placeho…
vishramachandran Oct 31, 2024
8a002be
JMH benchmark for exponential histograms
vishramachandran Nov 4, 2024
e9dcd17
JMH benchmark for exponential histograms - really works now; tested d…
vishramachandran Nov 4, 2024
bb7e39f
small changes
vishramachandran Nov 4, 2024
5c2ff27
Input Record changes for OTel exponential histograms
vishramachandran Nov 6, 2024
9211c07
PR comments
vishramachandran Nov 6, 2024
d1ad0f9
test failure
vishramachandran Nov 6, 2024
664c4c6
fraction core function working
vishramachandran Nov 5, 2024
303f9b6
histogram_fraction function
vishramachandran Nov 6, 2024
b5a678a
test addition
vishramachandran Nov 6, 2024
136a4c9
remove conversion to cumulative in InputRecord api. Client should do it
vishramachandran Nov 6, 2024
0990625
Add tests for histogram_fraction on custom buckets
vishramachandran Nov 7, 2024
8ee7d0a
optionally use min and max for hist_fraction
vishramachandran Nov 7, 2024
82f8381
histogram_quantile calculation correctness for exponential buckets
vishramachandran Nov 7, 2024
528ee55
More docs and small cleanup
vishramachandran Nov 12, 2024
246e381
scalastyle
vishramachandran Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ object NodeClusterActor {
* @param source the IngestionSource on each node. Use noOpSource to not start ingestion and
* manually push records into NodeCoordinator.
* @param storeConfig a StoreConfig for the MemStore.
* @param overrideSchema if true, will override the schema in the MetaStore with the schema in the Dataset.
* @return DatasetVerified - meaning the dataset and columns are valid. Does not mean ingestion is
* setup on all nodes - for that, subscribe to ShardMapUpdate's
*/
final case class SetupDataset(dataset: Dataset,
resources: DatasetResourceSpec,
source: IngestionSource,
storeConfig: StoreConfig,
downsampleConfig: DownsampleConfig = DownsampleConfig.disabled) {
downsampleConfig: DownsampleConfig = DownsampleConfig.disabled,
overrideSchema: Boolean = true) {
vishramachandran marked this conversation as resolved.
Show resolved Hide resolved
import collection.JavaConverters._
val resourceConfig = ConfigFactory.parseMap(
Map("num-shards" -> resources.numShards, "min-num-nodes" -> resources.minNumNodes).asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore,
}

def ingestHandlers: Receive = LoggingReceive {
case SetupDataset(dataset, resources, source, storeConf, downsample) =>
case SetupDataset(dataset, resources, source, storeConf, downsample, overrideSchema) =>
// used only in unit tests
if (!(ingesters contains dataset.ref)) {
setupDataset(dataset, storeConf, resources.numShards, source, downsample, true)
setupDataset(dataset, storeConf, resources.numShards, source, downsample, overrideSchema)
}

case IngestRows(dataset, shard, rows) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ object ProtoConverters {
case filodb.query.InstantFunctionId.HistogramQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE
case filodb.query.InstantFunctionId.HistogramMaxQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE
case filodb.query.InstantFunctionId.HistogramBucket => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET
case filodb.query.InstantFunctionId.HistogramFraction => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION
case filodb.query.InstantFunctionId.Ln => GrpcMultiPartitionQueryService.InstantFunctionId.LN
case filodb.query.InstantFunctionId.Log10 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG10
case filodb.query.InstantFunctionId.Log2 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG2
Expand Down Expand Up @@ -929,6 +930,7 @@ object ProtoConverters {
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE => filodb.query.InstantFunctionId.HistogramQuantile
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE => filodb.query.InstantFunctionId.HistogramMaxQuantile
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET => filodb.query.InstantFunctionId.HistogramBucket
case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION => filodb.query.InstantFunctionId.HistogramFraction
case GrpcMultiPartitionQueryService.InstantFunctionId.LN => filodb.query.InstantFunctionId.Ln
case GrpcMultiPartitionQueryService.InstantFunctionId.LOG10 => filodb.query.InstantFunctionId.Log10
case GrpcMultiPartitionQueryService.InstantFunctionId.LOG2 => filodb.query.InstantFunctionId.Log2
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/filodb.core/metadata/Schemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ final case class Schemas(part: PartitionSchema,
}

private def bytesPerSampleSwagString = bytesPerSampleSwag.map { case (k, v) =>
s"${schemaName(k._1)} ColId:${k._2} : $v"
s"${schemaName(k._1)} ${k._1} ColId:${k._2} : $v"
}

Schemas._log.info(s"bytesPerSampleSwag: $bytesPerSampleSwagString")
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/scala/filodb.core/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ object MachineMetricsData {
Seq("timestamp:ts", "count:long", "sum:long", "h:hist:counter=false"),
options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric"))

val expHistDataset = Dataset("histogram", Seq("metric:string", "tags:map"),
Seq("timestamp:ts", "count:long", "sum:long", "min:long", "max:long", "h:hist:counter=false"),
options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric"))

var histBucketScheme: bv.HistogramBuckets = _
def linearHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8,
infBucket: Boolean = false, ws: String = "demo"):
Expand Down Expand Up @@ -383,6 +387,29 @@ object MachineMetricsData {
}
}

def otelDeltaExponentialHistSeries(startTs: Long = 100000L, numSeries: Int = 10,
timeStep: Int = 10000, numBuckets: Int = 160, ws: String = "demo"):
Stream[Seq[Any]] = {
histBucketScheme = bv.Base2ExpHistogramBuckets(3, -20, numBuckets - 1)
val buckets = new Array[Long](numBuckets)
def updateBuckets(bucketNo: Int): Unit = {
for { b <- bucketNo until numBuckets } {
buckets(b) += 1
}
}
Stream.from(0).map { n =>
updateBuckets(n % numBuckets)
Seq(startTs + n * timeStep,
(1 + n).toLong,
buckets.sum,
buckets.min,
buckets.max,
bv.LongHistogram(histBucketScheme, buckets.map(x => x)),
"request-latency",
extraTags ++ Map("_ws_".utf8 -> ws.utf8, "_ns_".utf8 -> "testapp".utf8, "dc".utf8 -> s"${n % numSeries}".utf8))
}
}

// Data usable with prom-histogram schema
def linearPromHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8):
Stream[Seq[Any]] = linearHistSeries(startTs, numSeries, timeStep, numBuckets).map { d =>
Expand Down
17 changes: 12 additions & 5 deletions gateway/src/main/scala/filodb/gateway/GatewayServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ object GatewayServer extends StrictLogging {
descrYes = "Generate otel-cumulative-histogram schema test data and exit")
val genOtelDeltaHistData = toggle(noshort = true,
descrYes = "Generate otel-delta-histogram schema test data and exit")
val genOtelExpDeltaHistData = toggle(noshort = true,
descrYes = "Generate otel-exponential-delta-histogram schema test data and exit")
val genGaugeData = toggle(noshort = true, descrYes = "Generate Prometheus gauge-schema test data and exit")
val genCounterData = toggle(noshort = true, descrYes = "Generate Prometheus counter-schema test data and exit")
val genDeltaCounterData = toggle(noshort = true, descrYes = "Generate delta-counter-schema test data and exit")
Expand Down Expand Up @@ -140,9 +142,11 @@ object GatewayServer extends StrictLogging {
val genDeltaCounterData = userOpts.genDeltaCounterData.getOrElse(false)
val genOtelCumulativeHistData = userOpts.genOtelCumulativeHistData.getOrElse(false)
val genOtelDeltaHistData = userOpts.genOtelDeltaHistData.getOrElse(false)
val genOtelExpDeltaHistData = userOpts.genOtelExpDeltaHistData.getOrElse(false)

if (genHist || genGaugeData || genDeltaHist
|| genCounterData || genDeltaCounterData || genOtelDeltaHistData || genOtelCumulativeHistData) {
|| genCounterData || genDeltaCounterData || genOtelDeltaHistData ||
genOtelExpDeltaHistData || genOtelCumulativeHistData) {
val startTime = System.currentTimeMillis
logger.info(s"Generating $numSamples samples starting at $startTime....")

Expand All @@ -151,7 +155,10 @@ object GatewayServer extends StrictLogging {
otelCumulativeHistogram)
else if (genOtelDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
otelDeltaHistogram)
else if (genDeltaHist) TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram)
else if (genOtelExpDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries,
otelDeltaHistogram, otelExponential = true)
vishramachandran marked this conversation as resolved.
Show resolved Hide resolved
else if (genDeltaHist)
TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram)
else if (genGaugeData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries,
userOpts.numMetrics(), userOpts.publishIntervalSecs(), gauge)
else if (genDeltaCounterData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries,
Expand All @@ -170,8 +177,8 @@ object GatewayServer extends StrictLogging {
}
Thread sleep 10000
TestTimeseriesProducer.logQueryHelp(dataset.name, userOpts.numMetrics(), numSamples, numSeries,
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData, genOtelDeltaHistData,
userOpts.publishIntervalSecs())
startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData,
genOtelDeltaHistData, genOtelExpDeltaHistData, userOpts.publishIntervalSecs())
logger.info(s"Waited for containers to be sent, exiting...")
sys.exit(0)
} else {
Expand Down Expand Up @@ -260,7 +267,7 @@ object GatewayServer extends StrictLogging {
producing(shard) = false
output
}
}
}.filter { case (_, j) => j.nonEmpty }
logger.info(s"Created $numShards container builder queues with $parallelism parallel workers...")
(shardQueues, containerStream)
}
Expand Down
62 changes: 60 additions & 2 deletions gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import remote.RemoteStorage.TimeSeries
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{DatasetOptions, Schema}
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String => ZCUTF8}
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}

/**
* An InputRecord represents one "record" of timeseries data for input to FiloDB system.
Expand Down Expand Up @@ -182,7 +182,7 @@ object InputRecord {
}

/**
* Writes a delta non-increasing histogram record, along with the sum, count, min and max
* Writes a delta non-decreasing histogram record, along with the sum, count, min and max
*/
def writeOtelDeltaHistRecord(builder: RecordBuilder,
metric: String,
Expand Down Expand Up @@ -230,6 +230,64 @@ object InputRecord {
}
}

/**
* Writes a non-decreasing histogram record, along with the sum, count, min and max
*/
//scalastyle:off method.length
def writeOtelExponentialHistRecord(builder: RecordBuilder,
metric: String,
tags: Map[String, String],
timestamp: Long,
kvs: Seq[(String, Double)],
isDelta: Boolean): Unit = {
var sum = Double.NaN
var count = Double.NaN
var min = Double.NaN
var max = Double.NaN
var posBucketOffset = Double.NaN
var scale = Double.NaN

// Filter out sum and count, then convert and sort buckets
val sortedBuckets = kvs.filter {
case ("sum", v) => sum = v
false
case ("count", v) => count = v
false
case ("min", v) => min = v
false
case ("max", v) => max = v
false
case ("posBucketOffset", v) => posBucketOffset = v
false
case ("scale", v) => scale = v
false
case other => true
}.map {
case (k, v) => (k.toInt, v.toLong)
}.sorted

val bucketValues = sortedBuckets.map(_._2).toArray

if (sortedBuckets.nonEmpty) {
// length - 1 because the zero bucket is not included in the positive bucket count
val buckets = Base2ExpHistogramBuckets(scale.toInt, posBucketOffset.toInt, sortedBuckets.length - 1)
val hist = LongHistogram(buckets, bucketValues)

// Now, write out histogram
builder.startNewRecord(if (isDelta) otelDeltaHistogram else otelCumulativeHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())
builder.addDouble(min)
builder.addDouble(max)

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}

/**
* Writes a delta non-increasing histogram record, along with the sum and count,
* using the delta-histogram schema, storing the entire histogram together for efficiency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ object TestTimeseriesProducer extends StrictLogging {
//scalastyle:off method.length parameter.number
def logQueryHelp(dataset: String, numMetrics: Int, numSamples: Int, numTimeSeries: Int, startTimeMs: Long,
genHist: Boolean, genDeltaHist: Boolean, genGauge: Boolean,
genPromCounter: Boolean, genOtelCumulativeHistData: Boolean, genOtelDeltaHistData: Boolean,
genPromCounter: Boolean, genOtelCumulativeHistData: Boolean,
genOtelDeltaHistData: Boolean, genOtelExpDeltaHistData: Boolean,
publishIntervalSec: Int): Unit = {
val startQuery = startTimeMs / 1000
val endQuery = startQuery + (numSamples / numMetrics / numTimeSeries) * publishIntervalSec
logger.info(s"Finished producing $numSamples records for ${(endQuery-startQuery).toDouble/60} minutes")

val metricName = if (genGauge) "heap_usage0"
else if (genHist || genOtelCumulativeHistData) "http_request_latency"
else if (genDeltaHist || genOtelDeltaHistData) "http_request_latency_delta"
else if (genDeltaHist || genOtelDeltaHistData || genOtelExpDeltaHistData)
"http_request_latency_delta"
else if (genPromCounter) "heap_usage_counter0"
else "heap_usage_delta0"

Expand All @@ -98,15 +100,19 @@ object TestTimeseriesProducer extends StrictLogging {
dataset: Dataset,
shardMapper: ShardMapper,
spread: Int,
publishIntervalSec: Int): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = {
publishIntervalSec: Int,
expHist: Boolean = false,
numBuckets: Int = 20): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = {
val (shardQueues, containerStream) = GatewayServer.shardingPipeline(GlobalConfig.systemConfig, numShards, dataset)

val producingFut = Future {
timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge)
.take(numSamples)
val data = if (expHist) genHistogramData(startTimeMs, numTimeSeries,
Schemas.otelDeltaHistogram, numBuckets = numBuckets, otelExponential = true)
else timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge)
data.take(numSamples)
.foreach { rec =>
val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread)
while (!shardQueues(shard).offer(rec)) { Thread sleep 50 }
while (!shardQueues(shard).offer(rec)) { Thread sleep 50 }
}
}
(producingFut, containerStream)
Expand Down Expand Up @@ -203,18 +209,20 @@ object TestTimeseriesProducer extends StrictLogging {
* Note: the set of "instance" tags is unique for each invocation of genHistogramData. This helps increase
* the cardinality of time series for testing purposes.
*/
def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema): Stream[InputRecord] = {
val numBuckets = 10
val histBucketScheme = bv.GeometricBuckets(2.0, 3.0, numBuckets)
var buckets = new Array[Long](numBuckets)
def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema,
numBuckets : Int = 20,
otelExponential: Boolean = false): Stream[InputRecord] = {
val histBucketScheme = if (otelExponential) bv.Base2ExpHistogramBuckets(3, -numBuckets/2, numBuckets)
else bv.GeometricBuckets(2.0, 3.0, numBuckets)
var buckets = new Array[Long](histBucketScheme.numBuckets)
val metric = if (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema) {
"http_request_latency_delta"
} else {
"http_request_latency"
}

def updateBuckets(bucketNo: Int): Unit = {
for { b <- bucketNo until numBuckets } {
for { b <- bucketNo until histBucketScheme.numBuckets } {
buckets(b) += 1
}
}
Expand All @@ -232,9 +240,9 @@ object TestTimeseriesProducer extends StrictLogging {
if ( (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema )
&& prevTimestamp != timestamp) {
prevTimestamp = timestamp
buckets = new Array[Long](numBuckets)
buckets = new Array[Long](histBucketScheme.numBuckets)
}
updateBuckets(n % numBuckets)
updateBuckets(n % histBucketScheme.numBuckets)
val hist = bv.LongHistogram(histBucketScheme, buckets.map(x => x))
val count = util.Random.nextInt(100).toDouble
val sum = buckets.sum.toDouble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ package filodb.gateway.conversion
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.Schemas
import filodb.memory.MemFactory
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
val builder = new RecordBuilder(MemFactory.onHeapFactory)
val builder2 = new RecordBuilder(MemFactory.onHeapFactory)
val builder3 = new RecordBuilder(MemFactory.onHeapFactory)

val baseTags = Map("dataset" -> "timeseries",
"host" -> "MacBook-Pro-229.local",
Expand Down Expand Up @@ -97,6 +98,32 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
}
}

it("should otelExpDeltaHistogram to BR and be able to deserialize it") {
val bucketScheme = Base2ExpHistogramBuckets(3, -5, 10)
val bucketsCounts = Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11) // require cumulative counts
val expected = LongHistogram(bucketScheme, bucketsCounts)

val bucketKVs = bucketsCounts.zipWithIndex.map {
case (bucketCount, i) => i.toString -> bucketCount.toDouble
}.toSeq

// add posBucketOffset and scale
val more = Seq("posBucketOffset" -> bucketScheme.startIndexPositiveBuckets.toDouble,
"scale" -> bucketScheme.scale.toDouble)

InputRecord.writeOtelExponentialHistRecord(builder3, metric, baseTags, 100000L,
bucketKVs ++ sumCountMinMaxKVs ++ more, isDelta = true)
builder3.allContainers.head.iterate(Schemas.otelDeltaHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(4) shouldEqual min
row.getDouble(5) shouldEqual max
val hist = row.getHistogram(3).asInstanceOf[LongHistogram]
hist.buckets shouldEqual expected.buckets
hist.values shouldEqual Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11)
}
}

it("should skip empty histograms via writePromHistRecord, and write subsequent records") {
builder.reset()
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs)
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/protobuf/query_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ enum InstantFunctionId {
MONTH = 20;
YEAR = 21;
OR_VECTOR_DOUBLE = 22;
HISTOGRAM_FRACTION = 23;
}

enum ScalarFunctionId {
Expand Down
Loading
Loading