Skip to content

Commit

Permalink
Input Record changes for OTel exponential histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
vishramachandran committed Nov 6, 2024
1 parent 543a140 commit 464aade
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 3 deletions.
66 changes: 64 additions & 2 deletions gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import remote.RemoteStorage.TimeSeries
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.{DatasetOptions, Schema}
import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String => ZCUTF8}
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}

/**
* An InputRecord represents one "record" of timeseries data for input to FiloDB system.
Expand Down Expand Up @@ -182,7 +182,7 @@ object InputRecord {
}

/**
* Writes a delta non-increasing histogram record, along with the sum, count, min and max
* Writes a delta non-decreasing histogram record, along with the sum, count, min and max
*/
def writeOtelDeltaHistRecord(builder: RecordBuilder,
metric: String,
Expand Down Expand Up @@ -230,6 +230,68 @@ object InputRecord {
}
}

/**
* Writes a non-decreasing histogram record, along with the sum, count, min and max
*/
//scalastyle:off method.length
def writeOtelExponentialHistRecord(builder: RecordBuilder,
metric: String,
tags: Map[String, String],
timestamp: Long,
kvs: Seq[(String, Double)],
isDelta: Boolean): Unit = {
var sum = Double.NaN
var count = Double.NaN
var min = Double.NaN
var max = Double.NaN
var posBucketOffset = Double.NaN
var scale = Double.NaN

// Filter out sum and count, then convert and sort buckets
val sortedBuckets = kvs.filter {
case ("sum", v) => sum = v
false
case ("count", v) => count = v
false
case ("min", v) => min = v
false
case ("max", v) => max = v
false
case ("posBucketOffset", v) => posBucketOffset = v
false
case ("scale", v) => scale = v
false
case other => true
}.map {
case (k, v) => (k.toInt, v.toLong)
}.sorted

// convert buckets to non-decreasing (Otel exponential histogram buckets are not cumulative)
val bucketValues = sortedBuckets.map(_._2).toArray
for { i <- 1 until bucketValues.length } {
bucketValues(i) += bucketValues(i - 1)
}

if (sortedBuckets.nonEmpty) {
// length - 1 because the zero bucket is not included in the positive bucket count
val buckets = Base2ExpHistogramBuckets(scale.toInt, posBucketOffset.toInt, sortedBuckets.length - 1)
val hist = LongHistogram(buckets, bucketValues)

// Now, write out histogram
builder.startNewRecord(if (isDelta) otelDeltaHistogram else otelCumulativeHistogram)
builder.addLong(timestamp)
builder.addDouble(sum)
builder.addDouble(count)
builder.addBlob(hist.serialize())
builder.addDouble(min)
builder.addDouble(max)

builder.addString(metric)
builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) })
builder.endRecord()
}
}

/**
* Writes a delta non-increasing histogram record, along with the sum and count,
* using the delta-histogram schema, storing the entire histogram together for efficiency.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package filodb.gateway.conversion
import filodb.core.binaryrecord2.RecordBuilder
import filodb.core.metadata.Schemas
import filodb.memory.MemFactory
import filodb.memory.format.vectors.{CustomBuckets, LongHistogram}
import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

Expand Down Expand Up @@ -97,6 +97,32 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers {
}
}

it("should otelExpDeltaHistogram to BR and be able to deserialize it") {
val bucketScheme = Base2ExpHistogramBuckets(3, -5, 10)
val bucketsCounts = Array(6L, 4, 3, 8, 9, 2, 4, 5, 6, 7, 3) // not cumulative
val expected = LongHistogram(bucketScheme, bucketsCounts)

val bucketKVs = bucketsCounts.zipWithIndex.map {
case (bucketCount, i) => i.toString -> bucketCount.toDouble
}.toSeq

// add posBucketOffset and scale
val more = Seq("posBucketOffset" -> bucketScheme.startIndexPositiveBuckets.toDouble,
"scale" -> bucketScheme.scale.toDouble)

InputRecord.writeOtelExponentialHistRecord(builder2, metric, baseTags, 100000L,
bucketKVs ++ sumCountMinMaxKVs ++ more, isDelta = true)
builder2.allContainers.head.iterate(Schemas.otelDeltaHistogram.ingestionSchema).foreach { row =>
row.getDouble(1) shouldEqual sum
row.getDouble(2) shouldEqual count
row.getDouble(4) shouldEqual min
row.getDouble(5) shouldEqual max
val hist = row.getHistogram(3).asInstanceOf[LongHistogram]
hist.buckets shouldEqual expected.buckets
hist.values shouldEqual Array(6L, 10, 13, 21, 30, 32, 36, 41, 47, 54, 57) // should be converted to cumulative
}
}

it("should skip empty histograms via writePromHistRecord, and write subsequent records") {
builder.reset()
InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs)
Expand Down

0 comments on commit 464aade

Please sign in to comment.