diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala index 146540f6d6..b4df27a496 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala @@ -55,6 +55,7 @@ object NodeClusterActor { * @param source the IngestionSource on each node. Use noOpSource to not start ingestion and * manually push records into NodeCoordinator. * @param storeConfig a StoreConfig for the MemStore. + * @param overrideSchema if true, will override the schema in the MetaStore with the schema in the Dataset. * @return DatasetVerified - meaning the dataset and columns are valid. Does not mean ingestion is * setup on all nodes - for that, subscribe to ShardMapUpdate's */ @@ -62,7 +63,8 @@ object NodeClusterActor { resources: DatasetResourceSpec, source: IngestionSource, storeConfig: StoreConfig, - downsampleConfig: DownsampleConfig = DownsampleConfig.disabled) { + downsampleConfig: DownsampleConfig = DownsampleConfig.disabled, + overrideSchema: Boolean = true) { import collection.JavaConverters._ val resourceConfig = ConfigFactory.parseMap( Map("num-shards" -> resources.numShards, "min-num-nodes" -> resources.minNumNodes).asJava) diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala index 24caf2ae45..c67ac65370 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeCoordinatorActor.scala @@ -189,10 +189,10 @@ private[filodb] final class NodeCoordinatorActor(metaStore: MetaStore, } def ingestHandlers: Receive = LoggingReceive { - case SetupDataset(dataset, resources, source, storeConf, downsample) => + case SetupDataset(dataset, resources, source, storeConf, downsample, overrideSchema) => // used only in unit tests if (!(ingesters contains dataset.ref)) { - setupDataset(dataset, storeConf, resources.numShards, source, downsample, true) + setupDataset(dataset, storeConf, resources.numShards, source, downsample, overrideSchema) } case IngestRows(dataset, shard, rows) => diff --git a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala index feb3a158d1..32355df907 100644 --- a/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala +++ b/coordinator/src/main/scala/filodb/coordinator/ProtoConverters.scala @@ -898,6 +898,7 @@ object ProtoConverters { case filodb.query.InstantFunctionId.HistogramQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE case filodb.query.InstantFunctionId.HistogramMaxQuantile => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE case filodb.query.InstantFunctionId.HistogramBucket => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET + case filodb.query.InstantFunctionId.HistogramFraction => GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION case filodb.query.InstantFunctionId.Ln => GrpcMultiPartitionQueryService.InstantFunctionId.LN case filodb.query.InstantFunctionId.Log10 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG10 case filodb.query.InstantFunctionId.Log2 => GrpcMultiPartitionQueryService.InstantFunctionId.LOG2 @@ -929,6 +930,7 @@ object ProtoConverters { case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_QUANTILE => filodb.query.InstantFunctionId.HistogramQuantile case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_MAX_QUANTILE => filodb.query.InstantFunctionId.HistogramMaxQuantile case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_BUCKET => filodb.query.InstantFunctionId.HistogramBucket + case GrpcMultiPartitionQueryService.InstantFunctionId.HISTOGRAM_FRACTION => filodb.query.InstantFunctionId.HistogramFraction case GrpcMultiPartitionQueryService.InstantFunctionId.LN => filodb.query.InstantFunctionId.Ln case GrpcMultiPartitionQueryService.InstantFunctionId.LOG10 => filodb.query.InstantFunctionId.Log10 case GrpcMultiPartitionQueryService.InstantFunctionId.LOG2 => filodb.query.InstantFunctionId.Log2 diff --git a/core/src/main/scala/filodb.core/metadata/Schemas.scala b/core/src/main/scala/filodb.core/metadata/Schemas.scala index f15b701ce5..d72f5e1547 100644 --- a/core/src/main/scala/filodb.core/metadata/Schemas.scala +++ b/core/src/main/scala/filodb.core/metadata/Schemas.scala @@ -284,7 +284,7 @@ final case class Schemas(part: PartitionSchema, } private def bytesPerSampleSwagString = bytesPerSampleSwag.map { case (k, v) => - s"${schemaName(k._1)} ColId:${k._2} : $v" + s"${schemaName(k._1)} ${k._1} ColId:${k._2} : $v" } Schemas._log.info(s"bytesPerSampleSwag: $bytesPerSampleSwagString") diff --git a/core/src/test/scala/filodb.core/TestData.scala b/core/src/test/scala/filodb.core/TestData.scala index 674486cc05..0717443e8f 100644 --- a/core/src/test/scala/filodb.core/TestData.scala +++ b/core/src/test/scala/filodb.core/TestData.scala @@ -356,6 +356,10 @@ object MachineMetricsData { Seq("timestamp:ts", "count:long", "sum:long", "h:hist:counter=false"), options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric")) + val expHistDataset = Dataset("histogram", Seq("metric:string", "tags:map"), + Seq("timestamp:ts", "count:long", "sum:long", "min:long", "max:long", "h:hist:counter=false"), + options = DatasetOptions(shardKeyColumns = Seq("_ws_", "_ns_", "metric"), "metric")) + var histBucketScheme: bv.HistogramBuckets = _ def linearHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8, infBucket: Boolean = false, ws: String = "demo"): @@ -383,6 +387,29 @@ object MachineMetricsData { } } + def otelDeltaExponentialHistSeries(startTs: Long = 100000L, numSeries: Int = 10, + timeStep: Int = 10000, numBuckets: Int = 160, ws: String = "demo"): + Stream[Seq[Any]] = { + histBucketScheme = bv.Base2ExpHistogramBuckets(3, -20, numBuckets - 1) + val buckets = new Array[Long](numBuckets) + def updateBuckets(bucketNo: Int): Unit = { + for { b <- bucketNo until numBuckets } { + buckets(b) += 1 + } + } + Stream.from(0).map { n => + updateBuckets(n % numBuckets) + Seq(startTs + n * timeStep, + (1 + n).toLong, + buckets.sum, + buckets.min, + buckets.max, + bv.LongHistogram(histBucketScheme, buckets.map(x => x)), + "request-latency", + extraTags ++ Map("_ws_".utf8 -> ws.utf8, "_ns_".utf8 -> "testapp".utf8, "dc".utf8 -> s"${n % numSeries}".utf8)) + } + } + // Data usable with prom-histogram schema def linearPromHistSeries(startTs: Long = 100000L, numSeries: Int = 10, timeStep: Int = 1000, numBuckets: Int = 8): Stream[Seq[Any]] = linearHistSeries(startTs, numSeries, timeStep, numBuckets).map { d => diff --git a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala index 3929acb47b..987f1801a9 100644 --- a/gateway/src/main/scala/filodb/gateway/GatewayServer.scala +++ b/gateway/src/main/scala/filodb/gateway/GatewayServer.scala @@ -84,6 +84,8 @@ object GatewayServer extends StrictLogging { descrYes = "Generate otel-cumulative-histogram schema test data and exit") val genOtelDeltaHistData = toggle(noshort = true, descrYes = "Generate otel-delta-histogram schema test data and exit") + val genOtelExpDeltaHistData = toggle(noshort = true, + descrYes = "Generate otel-exponential-delta-histogram schema test data and exit") val genGaugeData = toggle(noshort = true, descrYes = "Generate Prometheus gauge-schema test data and exit") val genCounterData = toggle(noshort = true, descrYes = "Generate Prometheus counter-schema test data and exit") val genDeltaCounterData = toggle(noshort = true, descrYes = "Generate delta-counter-schema test data and exit") @@ -140,9 +142,11 @@ object GatewayServer extends StrictLogging { val genDeltaCounterData = userOpts.genDeltaCounterData.getOrElse(false) val genOtelCumulativeHistData = userOpts.genOtelCumulativeHistData.getOrElse(false) val genOtelDeltaHistData = userOpts.genOtelDeltaHistData.getOrElse(false) + val genOtelExpDeltaHistData = userOpts.genOtelExpDeltaHistData.getOrElse(false) if (genHist || genGaugeData || genDeltaHist - || genCounterData || genDeltaCounterData || genOtelDeltaHistData || genOtelCumulativeHistData) { + || genCounterData || genDeltaCounterData || genOtelDeltaHistData || + genOtelExpDeltaHistData || genOtelCumulativeHistData) { val startTime = System.currentTimeMillis logger.info(s"Generating $numSamples samples starting at $startTime....") @@ -151,7 +155,10 @@ object GatewayServer extends StrictLogging { otelCumulativeHistogram) else if (genOtelDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries, otelDeltaHistogram) - else if (genDeltaHist) TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram) + else if (genOtelExpDeltaHistData) TestTimeseriesProducer.genHistogramData(startTime, numSeries, + otelDeltaHistogram, otelExponential = true) + else if (genDeltaHist) + TestTimeseriesProducer.genHistogramData(startTime, numSeries, deltaHistogram) else if (genGaugeData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries, userOpts.numMetrics(), userOpts.publishIntervalSecs(), gauge) else if (genDeltaCounterData) TestTimeseriesProducer.timeSeriesData(startTime, numSeries, @@ -170,8 +177,8 @@ object GatewayServer extends StrictLogging { } Thread sleep 10000 TestTimeseriesProducer.logQueryHelp(dataset.name, userOpts.numMetrics(), numSamples, numSeries, - startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData, genOtelDeltaHistData, - userOpts.publishIntervalSecs()) + startTime, genHist, genDeltaHist, genGaugeData, genCounterData, genOtelCumulativeHistData, + genOtelDeltaHistData, genOtelExpDeltaHistData, userOpts.publishIntervalSecs()) logger.info(s"Waited for containers to be sent, exiting...") sys.exit(0) } else { @@ -260,7 +267,7 @@ object GatewayServer extends StrictLogging { producing(shard) = false output } - } + }.filter { case (_, j) => j.nonEmpty } logger.info(s"Created $numShards container builder queues with $parallelism parallel workers...") (shardQueues, containerStream) } diff --git a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala index e7dbd77607..eee6149de1 100644 --- a/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala +++ b/gateway/src/main/scala/filodb/gateway/conversion/InputRecord.scala @@ -5,7 +5,7 @@ import remote.RemoteStorage.TimeSeries import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.{DatasetOptions, Schema} import filodb.memory.format.{SeqRowReader, ZeroCopyUTF8String => ZCUTF8} -import filodb.memory.format.vectors.{CustomBuckets, LongHistogram} +import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram} /** * An InputRecord represents one "record" of timeseries data for input to FiloDB system. @@ -182,7 +182,7 @@ object InputRecord { } /** - * Writes a delta non-increasing histogram record, along with the sum, count, min and max + * Writes a delta non-decreasing histogram record, along with the sum, count, min and max */ def writeOtelDeltaHistRecord(builder: RecordBuilder, metric: String, @@ -230,6 +230,64 @@ object InputRecord { } } + /** + * Writes a non-decreasing histogram record, along with the sum, count, min and max + */ + //scalastyle:off method.length + def writeOtelExponentialHistRecord(builder: RecordBuilder, + metric: String, + tags: Map[String, String], + timestamp: Long, + kvs: Seq[(String, Double)], + isDelta: Boolean): Unit = { + var sum = Double.NaN + var count = Double.NaN + var min = Double.NaN + var max = Double.NaN + var posBucketOffset = Double.NaN + var scale = Double.NaN + + // Filter out sum and count, then convert and sort buckets + val sortedBuckets = kvs.filter { + case ("sum", v) => sum = v + false + case ("count", v) => count = v + false + case ("min", v) => min = v + false + case ("max", v) => max = v + false + case ("posBucketOffset", v) => posBucketOffset = v + false + case ("scale", v) => scale = v + false + case other => true + }.map { + case (k, v) => (k.toInt, v.toLong) + }.sorted + + val bucketValues = sortedBuckets.map(_._2).toArray + + if (sortedBuckets.nonEmpty) { + // length - 1 because the zero bucket is not included in the positive bucket count + val buckets = Base2ExpHistogramBuckets(scale.toInt, posBucketOffset.toInt, sortedBuckets.length - 1) + val hist = LongHistogram(buckets, bucketValues) + + // Now, write out histogram + builder.startNewRecord(if (isDelta) otelDeltaHistogram else otelCumulativeHistogram) + builder.addLong(timestamp) + builder.addDouble(sum) + builder.addDouble(count) + builder.addBlob(hist.serialize()) + builder.addDouble(min) + builder.addDouble(max) + + builder.addString(metric) + builder.addMap(tags.map { case (k, v) => (k.utf8, v.utf8) }) + builder.endRecord() + } + } + /** * Writes a delta non-increasing histogram record, along with the sum and count, * using the delta-histogram schema, storing the entire histogram together for efficiency. diff --git a/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala b/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala index 73ac7754d2..f75fcec93b 100644 --- a/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala +++ b/gateway/src/main/scala/filodb/timeseries/TestTimeseriesProducer.scala @@ -64,7 +64,8 @@ object TestTimeseriesProducer extends StrictLogging { //scalastyle:off method.length parameter.number def logQueryHelp(dataset: String, numMetrics: Int, numSamples: Int, numTimeSeries: Int, startTimeMs: Long, genHist: Boolean, genDeltaHist: Boolean, genGauge: Boolean, - genPromCounter: Boolean, genOtelCumulativeHistData: Boolean, genOtelDeltaHistData: Boolean, + genPromCounter: Boolean, genOtelCumulativeHistData: Boolean, + genOtelDeltaHistData: Boolean, genOtelExpDeltaHistData: Boolean, publishIntervalSec: Int): Unit = { val startQuery = startTimeMs / 1000 val endQuery = startQuery + (numSamples / numMetrics / numTimeSeries) * publishIntervalSec @@ -72,7 +73,8 @@ object TestTimeseriesProducer extends StrictLogging { val metricName = if (genGauge) "heap_usage0" else if (genHist || genOtelCumulativeHistData) "http_request_latency" - else if (genDeltaHist || genOtelDeltaHistData) "http_request_latency_delta" + else if (genDeltaHist || genOtelDeltaHistData || genOtelExpDeltaHistData) + "http_request_latency_delta" else if (genPromCounter) "heap_usage_counter0" else "heap_usage_delta0" @@ -98,15 +100,19 @@ object TestTimeseriesProducer extends StrictLogging { dataset: Dataset, shardMapper: ShardMapper, spread: Int, - publishIntervalSec: Int): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = { + publishIntervalSec: Int, + expHist: Boolean = false, + numBuckets: Int = 20): (Future[Unit], Observable[(Int, Seq[Array[Byte]])]) = { val (shardQueues, containerStream) = GatewayServer.shardingPipeline(GlobalConfig.systemConfig, numShards, dataset) val producingFut = Future { - timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge) - .take(numSamples) + val data = if (expHist) genHistogramData(startTimeMs, numTimeSeries, + Schemas.otelDeltaHistogram, numBuckets = numBuckets, otelExponential = true) + else timeSeriesData(startTimeMs, numTimeSeries, numMetricNames, publishIntervalSec, gauge) + data.take(numSamples) .foreach { rec => val shard = shardMapper.ingestionShard(rec.shardKeyHash, rec.partitionKeyHash, spread) - while (!shardQueues(shard).offer(rec)) { Thread sleep 50 } + while (!shardQueues(shard).offer(rec)) { Thread sleep 50 } } } (producingFut, containerStream) @@ -203,10 +209,12 @@ object TestTimeseriesProducer extends StrictLogging { * Note: the set of "instance" tags is unique for each invocation of genHistogramData. This helps increase * the cardinality of time series for testing purposes. */ - def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema): Stream[InputRecord] = { - val numBuckets = 10 - val histBucketScheme = bv.GeometricBuckets(2.0, 3.0, numBuckets) - var buckets = new Array[Long](numBuckets) + def genHistogramData(startTime: Long, numTimeSeries: Int = 16, histSchema: Schema, + numBuckets : Int = 20, + otelExponential: Boolean = false): Stream[InputRecord] = { + val histBucketScheme = if (otelExponential) bv.Base2ExpHistogramBuckets(3, -numBuckets/2, numBuckets) + else bv.GeometricBuckets(2.0, 3.0, numBuckets) + var buckets = new Array[Long](histBucketScheme.numBuckets) val metric = if (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema) { "http_request_latency_delta" } else { @@ -214,7 +222,7 @@ object TestTimeseriesProducer extends StrictLogging { } def updateBuckets(bucketNo: Int): Unit = { - for { b <- bucketNo until numBuckets } { + for { b <- bucketNo until histBucketScheme.numBuckets } { buckets(b) += 1 } } @@ -232,9 +240,9 @@ object TestTimeseriesProducer extends StrictLogging { if ( (Schemas.deltaHistogram == histSchema || Schemas.otelDeltaHistogram == histSchema ) && prevTimestamp != timestamp) { prevTimestamp = timestamp - buckets = new Array[Long](numBuckets) + buckets = new Array[Long](histBucketScheme.numBuckets) } - updateBuckets(n % numBuckets) + updateBuckets(n % histBucketScheme.numBuckets) val hist = bv.LongHistogram(histBucketScheme, buckets.map(x => x)) val count = util.Random.nextInt(100).toDouble val sum = buckets.sum.toDouble diff --git a/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala b/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala index a374e25c67..954709b048 100644 --- a/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala +++ b/gateway/src/test/scala/filodb/gateway/conversion/InputRecordBuilderSpec.scala @@ -4,13 +4,14 @@ package filodb.gateway.conversion import filodb.core.binaryrecord2.RecordBuilder import filodb.core.metadata.Schemas import filodb.memory.MemFactory -import filodb.memory.format.vectors.{CustomBuckets, LongHistogram} +import filodb.memory.format.vectors.{Base2ExpHistogramBuckets, CustomBuckets, LongHistogram} import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers class InputRecordBuilderSpec extends AnyFunSpec with Matchers { val builder = new RecordBuilder(MemFactory.onHeapFactory) val builder2 = new RecordBuilder(MemFactory.onHeapFactory) + val builder3 = new RecordBuilder(MemFactory.onHeapFactory) val baseTags = Map("dataset" -> "timeseries", "host" -> "MacBook-Pro-229.local", @@ -97,6 +98,32 @@ class InputRecordBuilderSpec extends AnyFunSpec with Matchers { } } + it("should otelExpDeltaHistogram to BR and be able to deserialize it") { + val bucketScheme = Base2ExpHistogramBuckets(3, -5, 10) + val bucketsCounts = Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11) // require cumulative counts + val expected = LongHistogram(bucketScheme, bucketsCounts) + + val bucketKVs = bucketsCounts.zipWithIndex.map { + case (bucketCount, i) => i.toString -> bucketCount.toDouble + }.toSeq + + // add posBucketOffset and scale + val more = Seq("posBucketOffset" -> bucketScheme.startIndexPositiveBuckets.toDouble, + "scale" -> bucketScheme.scale.toDouble) + + InputRecord.writeOtelExponentialHistRecord(builder3, metric, baseTags, 100000L, + bucketKVs ++ sumCountMinMaxKVs ++ more, isDelta = true) + builder3.allContainers.head.iterate(Schemas.otelDeltaHistogram.ingestionSchema).foreach { row => + row.getDouble(1) shouldEqual sum + row.getDouble(2) shouldEqual count + row.getDouble(4) shouldEqual min + row.getDouble(5) shouldEqual max + val hist = row.getHistogram(3).asInstanceOf[LongHistogram] + hist.buckets shouldEqual expected.buckets + hist.values shouldEqual Array(1L, 2, 3,4, 5, 6, 7, 8, 9, 10, 11) + } + } + it("should skip empty histograms via writePromHistRecord, and write subsequent records") { builder.reset() InputRecord.writePromHistRecord(builder, metric, baseTags, 100000L, sumCountKVs) diff --git a/grpc/src/main/protobuf/query_service.proto b/grpc/src/main/protobuf/query_service.proto index 8608937c92..ea58539491 100644 --- a/grpc/src/main/protobuf/query_service.proto +++ b/grpc/src/main/protobuf/query_service.proto @@ -597,6 +597,7 @@ enum InstantFunctionId { MONTH = 20; YEAR = 21; OR_VECTOR_DOUBLE = 22; + HISTOGRAM_FRACTION = 23; } enum ScalarFunctionId { diff --git a/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala new file mode 100644 index 0000000000..b00038cf07 --- /dev/null +++ b/jmh/src/main/scala/filodb.jmh/Base2ExponentialHistogramQueryBenchmark.scala @@ -0,0 +1,157 @@ +package filodb.jmh + +import java.util.concurrent.TimeUnit + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import ch.qos.logback.classic.{Level, Logger} +import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import com.typesafe.scalalogging.StrictLogging +import monix.eval.Task +import monix.reactive.Observable +import org.openjdk.jmh.annotations._ + +import filodb.coordinator.queryplanner.SingleClusterPlanner +import filodb.core.SpreadChange +import filodb.core.binaryrecord2.RecordContainer +import filodb.core.memstore.{SomeData, TimeSeriesMemStore} +import filodb.core.metadata.Schemas +import filodb.core.query.{QueryConfig, QueryContext} +import filodb.core.store.StoreConfig +import filodb.prometheus.ast.TimeStepParams +import filodb.prometheus.parse.Parser +import filodb.query.{QueryError => QError, QueryResult => QueryResult2} +import filodb.timeseries.TestTimeseriesProducer + +//scalastyle:off regex +/** + * A macrobenchmark (IT-test level) for QueryEngine2 aggregations, in-memory only (no on-demand paging) + * No ingestion occurs while query test is running -- this is intentional to allow for query-focused CPU + * and memory allocation analysis. + * Ingests a fixed amount of tsgenerator data in Prometheus schema (time, value, tags) and runs various queries. + */ +@State(Scope.Thread) +class Base2ExponentialHistogramQueryBenchmark extends StrictLogging { + org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.DEBUG) + + import filodb.coordinator._ + import client.Client.{actorAsk, asyncAsk} + import client.QueryCommands._ + import NodeClusterActor._ + + val startTime = System.currentTimeMillis - (3600*1000) + val queryIntervalMin = 55 // # minutes between start and stop + + // TODO: move setup and ingestion to another trait + val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf") + .withValue("filodb.memstore.ingestion-buffer-mem-size", ConfigValueFactory.fromAnyRef("30MB"))) + + private val cluster = FilodbCluster(system) + cluster.join() + + private val coordinator = cluster.coordinatorActor + private val clusterActor = cluster.clusterSingleton(ClusterRole.Server, None) + + // Set up Prometheus dataset in cluster, initialize in metastore + private val dataset = TestTimeseriesProducer.dataset + private val shardMapper = new ShardMapper(numShards) + (0 until numShards).foreach { s => + shardMapper.updateFromEvent(IngestionStarted(dataset.ref, s, coordinator)) + } + + Await.result(cluster.metaStore.initialize(), 3.seconds) + + val storeConf = StoreConfig(ConfigFactory.parseString(""" + | flush-interval = 1h + | shard-mem-size = 96MB + | groups-per-shard = 4 + | demand-paging-enabled = false + """.stripMargin)) + val command = SetupDataset(dataset, DatasetResourceSpec(numShards, 1), noOpSource, storeConf, overrideSchema = false) + actorAsk(clusterActor, command) { case DatasetVerified => println(s"dataset setup") } + coordinator ! command + + val queryConfig = QueryConfig(cluster.settings.allConfig.getConfig("filodb.query")) + + import monix.execution.Scheduler.Implicits.global + + final val numShards = 8 + final val numSamplesPerTs = 720 // 2 hours * 3600 / 10 sec interval + final val numSeries = 100 + final val numQueries = 100 + final val numBuckets = 160 + val spread = 3 + + // Manually pump in data ourselves so we know when it's done. + // TODO: ingest into multiple shards + Thread sleep 2000 // Give setup command some time to set up dataset shards etc. + val (producingFut, containerStream) = TestTimeseriesProducer.metricsToContainerStream(startTime, numShards, numSeries, + numMetricNames = 1, numSamplesPerTs * numSeries, dataset, shardMapper, spread, + publishIntervalSec = 10, numBuckets = numBuckets, expHist = true) + val ingestTask = containerStream.groupBy(_._1) + // Asynchronously subcribe and ingest each shard + .mapParallelUnordered(numShards) { groupedStream => + val shard = groupedStream.key + println(s"Starting ingest exp histograms on shard $shard...") + val shardStream = groupedStream.zipWithIndex.flatMap { case ((_, bytes), idx) => + val data = bytes.map { array => SomeData(RecordContainer(array), idx) } + Observable.fromIterable(data) + } + Task.fromFuture(cluster.memStore.startIngestion(dataset.ref, shard, shardStream, global)) + }.countL.runToFuture + Await.result(producingFut, 90.seconds) + Thread sleep 10000 + cluster.memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset.ref) // commit lucene index + println(s"Ingestion ended") + + // Stuff for directly executing queries ourselves + val engine = new SingleClusterPlanner(dataset, Schemas(dataset.schema), shardMapper, 0, + queryConfig, "raw") + + /** + * ## ======== Queries =========== + * They are designed to match all the time series (common case) under a particular metric and job + */ + val histQuantileQuery = + """histogram_quantile(0.7, sum(rate(http_request_latency_delta{_ws_="demo", _ns_="App-0"}[5m])))""" + val queries = Seq(histQuantileQuery) + val queryTime = startTime + (7 * 60 * 1000) // 5 minutes from start until 60 minutes from start + val queryStep = 120 // # of seconds between each query sample "step" + val qParams = TimeStepParams(queryTime/1000, queryStep, (queryTime/1000) + queryIntervalMin*60) + val logicalPlans = queries.map { q => Parser.queryRangeToLogicalPlan(q, qParams) } + val queryCommands = logicalPlans.map { plan => + LogicalPlan2Query(dataset.ref, plan, QueryContext(Some(new StaticSpreadProvider(SpreadChange(0, spread))), 20000)) + } + + var queriesSucceeded = 0 + var queriesFailed = 0 + var queryZeroResults = 0 + + @TearDown + def shutdownFiloActors(): Unit = { + cluster.shutdown() + println(s"Succeeded: $queriesSucceeded Failed: $queriesFailed Zero Results: $queryZeroResults") + if (queriesFailed > 0) throw new RuntimeException(s"Queries failed: $queriesFailed") + if (queryZeroResults > 0) throw new RuntimeException(s"Queries with zero results: $queryZeroResults") + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + @OperationsPerInvocation(numQueries) + def histQuantileRangeQueries(): Unit = { + val futures = (0 until numQueries).map { n => + val qCmd = queryCommands(n % queryCommands.length) + val time = System.currentTimeMillis + val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time))) + f.foreach { + case q: QueryResult2 => if (q.result.nonEmpty) queriesSucceeded += 1 else queryZeroResults += 1 + case e: QError => queriesFailed += 1 + } + f + } + Await.result(Future.sequence(futures), 60.seconds) + } +} diff --git a/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala b/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala index 05700bcfdb..1ebe6b12de 100644 --- a/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala +++ b/jmh/src/main/scala/filodb.jmh/QueryInMemoryBenchmark.scala @@ -9,7 +9,6 @@ import akka.actor.ActorSystem import ch.qos.logback.classic.{Level, Logger} import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import com.typesafe.scalalogging.StrictLogging -import kamon.Kamon import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable @@ -28,10 +27,11 @@ import filodb.query.{QueryError => QError, QueryResult => QueryResult2} import filodb.timeseries.TestTimeseriesProducer object Params { - final val numShards = 32 + final val numShards = 8 final val numSamples = 720 // 2 hours * 3600 / 10 sec interval final val numSeries = 100 final val numQueries = 100 + val spread = 3 } //scalastyle:off regex @@ -43,7 +43,6 @@ object Params { */ @State(Scope.Thread) class QueryInMemoryBenchmark extends StrictLogging { - Kamon.init() // Needed for metrics logging org.slf4j.LoggerFactory.getLogger("filodb").asInstanceOf[Logger].setLevel(Level.INFO) import filodb.coordinator._ @@ -51,13 +50,10 @@ class QueryInMemoryBenchmark extends StrictLogging { import client.QueryCommands._ import NodeClusterActor._ import Params._ - import filodb.standalone.SimpleProfiler - val prof = new SimpleProfiler(10, 120, 50) val startTime = System.currentTimeMillis - (3600*1000) val queryIntervalMin = 55 // # minutes between start and stop val queryStep = 150 // # of seconds between each query sample "step" - val spread = 5 // TODO: move setup and ingestion to another trait val system = ActorSystem("test", ConfigFactory.load("filodb-defaults.conf") @@ -84,7 +80,7 @@ class QueryInMemoryBenchmark extends StrictLogging { | groups-per-shard = 4 | demand-paging-enabled = false """.stripMargin)) - val command = SetupDataset(dataset, DatasetResourceSpec(numShards, 1), noOpSource, storeConf) + val command = SetupDataset(dataset, DatasetResourceSpec(numShards, 1), noOpSource, storeConf, overrideSchema = false) actorAsk(clusterActor, command) { case DatasetVerified => println(s"dataset setup") } coordinator ! command @@ -113,7 +109,6 @@ class QueryInMemoryBenchmark extends StrictLogging { Thread sleep 2000 cluster.memStore.asInstanceOf[TimeSeriesMemStore].refreshIndexForTesting(dataset.ref) // commit lucene index println(s"Ingestion ended") - prof.start() // Stuff for directly executing queries ourselves val engine = new SingleClusterPlanner(dataset, Schemas(dataset.schema), shardMapper, 0, @@ -139,12 +134,14 @@ class QueryInMemoryBenchmark extends StrictLogging { var queriesSucceeded = 0 var queriesFailed = 0 + var queryZeroResults = 0 @TearDown def shutdownFiloActors(): Unit = { cluster.shutdown() - println(s"Succeeded: $queriesSucceeded Failed: $queriesFailed") - prof.stop() + println(s"Succeeded: $queriesSucceeded Failed: $queriesFailed Zero Results: $queryZeroResults") + if (queriesFailed > 0) throw new RuntimeException(s"Queries failed: $queriesFailed") + if (queryZeroResults > 0) throw new RuntimeException(s"Queries with zero results: $queryZeroResults") } // Window = 5 min and step=2.5 min, so 50% overlap @@ -158,7 +155,7 @@ class QueryInMemoryBenchmark extends StrictLogging { val time = System.currentTimeMillis val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time))) f.foreach { - case q: QueryResult2 => queriesSucceeded += 1 + case q: QueryResult2 => if (q.result.nonEmpty) queriesSucceeded += 1 else queryZeroResults += 1 case e: QError => queriesFailed += 1 } f @@ -183,7 +180,7 @@ class QueryInMemoryBenchmark extends StrictLogging { val time = System.currentTimeMillis val f = asyncAsk(coordinator, qCmd.copy(qContext = qCmd.qContext.copy(queryId = n.toString, submitTime = time))) f.foreach { - case q: QueryResult2 => queriesSucceeded += 1 + case q: QueryResult2 => if (q.result.nonEmpty) queriesSucceeded += 1 else queryZeroResults += 1 case e: QError => queriesFailed += 1 } f diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index 3073937ea4..8fef409563 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -7,6 +7,7 @@ import spire.syntax.cfor._ import filodb.memory.format._ + /** * A trait to represent bucket-based histograms as well as derived statistics such as sums or rates of * increasing histograms. @@ -14,6 +15,7 @@ import filodb.memory.format._ * or equal to the bucketTop ("upper bound"), thus it is cumulative with increasing bucket number. * Furthermore the LAST bucket should contain the total number of observations overall. */ +//scalastyle:off file.size.limit trait Histogram extends Ordered[Histogram] { def numBuckets: Int @@ -33,6 +35,8 @@ trait Histogram extends Ordered[Histogram] { */ def bucketValue(no: Int): Double + def hasExponentialBuckets: Boolean + /** * Returns an MutableDirectBuffer pointing to a serialized BinaryHistogram representation of this histogram. * @param buf if Some(buf) supplied, then that buf is either written into or re-used to wrap where the serialized @@ -68,9 +72,9 @@ trait Histogram extends Ordered[Histogram] { // using rank, find the le bucket which would have the identified rank val b = firstBucketGTE(rank) - // now calculate quantile. If bucket is last one then return second-to-last bucket top + // now calculate quantile. If bucket is last one and last bucket is +Inf then return second-to-last bucket top // as we cannot interpolate to +Inf. - if (b == numBuckets-1) return bucketTop(numBuckets-2) + if (b == numBuckets-1 && bucketTop(numBuckets - 1).isPosInfinity) return bucketTop(numBuckets-2) else if (b == 0 && bucketTop(0) <= 0) return bucketTop(0) else { // interpolate quantile within le bucket @@ -80,12 +84,109 @@ trait Histogram extends Ordered[Histogram] { count -= bucketValue(b-1) rank -= bucketValue(b-1) } - bucketStart + (bucketEnd-bucketStart)*(rank/count) + val fraction = rank/count + if (!hasExponentialBuckets || bucketStart == 0) { + bucketStart + (bucketEnd-bucketStart) * fraction + } else { + val logBucketEnd = log2(bucketEnd) + val logBucketStart = log2(bucketStart) + val logRank = logBucketStart + (logBucketEnd - logBucketStart) * fraction + Math.pow(2, logRank) + } } } result } + private def log2(v: Double) = Math.log(v) / Math.log(2) + + /** + * Adapted from histogram_fraction in Prometheus codebase, but modified to handle + * the fact that bucket values are cumulative. Also, if min and max are provided, + * then interpolation accuracy is improved. + */ + //scalastyle:off cyclomatic.complexity + //scalastyle:off method.length + def histogramFraction(lower: Double, upper: Double, + min: Double = Double.NegativeInfinity, + max: Double = Double.PositiveInfinity): Double = { + require(lower >= 0 && upper >= 0, s"lower & upper params should be >= 0: lower=$lower, upper=$upper") + if (numBuckets == 0 || lower.isNaN || upper.isNaN || topBucketValue == 0) { + return Double.NaN + } + val count = topBucketValue + + if (lower == 0 && upper == 0) { + return bucketValue(0) / count + } + if (lower >= upper) { + return 0.0 + } + + var lowerRank = 0.0 + var upperRank = 0.0 + var lowerSet = false + var upperSet = false + val it = (0 until numBuckets).iterator + + while (it.hasNext && (!lowerSet || !upperSet)) { + val b = it.next() + val zeroBucket = (b == 0) + val bucketUpper = bucketTop(b) + val bucketLower = if (b == 0) 0.0 else bucketTop(b - 1) + val bucketVal = bucketValue(b) + val prevBucketVal = if (b == 0) 0.0 else bucketValue(b - 1) + + // Define interpolation functions + def interpolateLinearly(v: Double): Double = { + val low = Math.max(bucketLower, min) + val high = Math.min(bucketUpper, max) + val fraction = (v - low) / (high - low) + prevBucketVal + (bucketVal - prevBucketVal) * fraction + } + + def interpolateExponentially(v: Double) = { + val low = Math.max(bucketLower, min) + val high = Math.min(bucketUpper, max) + val logLower = log2(Math.abs(low)) + val logUpper = log2(Math.abs(high)) + val logV = log2(Math.abs(v)) + val fraction = if (v > 0) (logV - logLower) / (logUpper - logLower) + else 1 - ((logV - logUpper) / (logLower - logUpper)) + prevBucketVal + (bucketVal - prevBucketVal) * fraction + } + + if (!lowerSet && bucketLower == lower) { + // We have hit the lower value at the lower bucket boundary. + lowerRank = prevBucketVal + lowerSet = true + } + if (!upperSet && bucketUpper == upper) { + // We have hit the upper value at the lower bucket boundary. + upperRank = bucketVal + upperSet = true + } + + if (!lowerSet && bucketLower < lower && bucketUpper > lower) { + // The lower value is in this bucket + lowerRank = if (!hasExponentialBuckets || zeroBucket) interpolateLinearly(lower) + else interpolateExponentially(lower) + lowerSet = true + } + if (!upperSet && bucketLower < upper && bucketUpper > upper) { + // The upper value is in this bucket + upperRank = if (!hasExponentialBuckets || zeroBucket) interpolateLinearly(upper) + else interpolateExponentially(upper) + upperSet = true + } + } + + if (!lowerSet || lowerRank > count) lowerRank = count + if (!upperSet || upperRank > count) upperRank = count + + (upperRank - lowerRank) / count + } + /** * Compares two Histograms for equality. * If the # buckets or bucket bounds are not equal, just compare the top bucket value. @@ -135,6 +236,7 @@ trait HistogramWithBuckets extends Histogram { } values } + def hasExponentialBuckets: Boolean = buckets.isInstanceOf[Base2ExpHistogramBuckets] } object HistogramWithBuckets { @@ -142,7 +244,7 @@ object HistogramWithBuckets { val empty = LongHistogram(HistogramBuckets.emptyBuckets, Array[Long]()) } -final case class LongHistogram(buckets: HistogramBuckets, values: Array[Long]) extends HistogramWithBuckets { +final case class LongHistogram(var buckets: HistogramBuckets, var values: Array[Long]) extends HistogramWithBuckets { final def bucketValue(no: Int): Double = values(no).toDouble final def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = { val buf = intoBuf.getOrElse(BinaryHistogram.histBuf) @@ -160,6 +262,9 @@ final case class LongHistogram(buckets: HistogramBuckets, values: Array[Long]) e s"Expected: ${buckets}, Found: ${other.buckets}" ) } + // TODO if otel histogram, the need to add values in a different way + // see if we can refactor since MutableHistogram also has this logic + assert(other.buckets == buckets) cforRange { 0 until numBuckets } { b => values(b) += other.values(b) } @@ -191,12 +296,16 @@ object LongHistogram { /** * A histogram class that can be used for aggregation and to represent intermediate values */ -final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Double]) extends HistogramWithBuckets { +final case class MutableHistogram(var buckets: HistogramBuckets, + var values: Array[Double]) extends HistogramWithBuckets { + require(buckets.numBuckets == values.size, s"Invalid number of values: ${values.size} != ${buckets.numBuckets}") + final def bucketValue(no: Int): Double = values(no) final def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = { val buf = intoBuf.getOrElse(BinaryHistogram.histBuf) buckets match { case g: GeometricBuckets if g.minusOne => BinaryHistogram.writeDelta(g, values.map(_.toLong), buf) + case g: Base2ExpHistogramBuckets => BinaryHistogram.writeDelta(g, values.map(_.toLong), buf) case _ => BinaryHistogram.writeDoubles(buckets, values, buf) } buf @@ -229,6 +338,7 @@ final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Doubl * @param other Histogram to be added * @return true when input histogram has same schema and false when schema is different */ + // scalastyle:off method.length final def addNoCorrection(other: HistogramWithBuckets): Boolean = { // Allow addition when type of bucket is different if (buckets.similarForMath(other.buckets)) { @@ -238,7 +348,31 @@ final case class MutableHistogram(buckets: HistogramBuckets, values: Array[Doubl values(b) += other.bucketValue(b) } true - } else { + } else if (buckets.isInstanceOf[Base2ExpHistogramBuckets] && + other.buckets.isInstanceOf[Base2ExpHistogramBuckets]) { + // If it was NaN before, reset to 0 to sum another hist + if (java.lang.Double.isNaN(values(0))) java.util.Arrays.fill(values, 0.0) + val ourBuckets = buckets.asInstanceOf[Base2ExpHistogramBuckets] + val otherBuckets = other.buckets.asInstanceOf[Base2ExpHistogramBuckets] + // if our buckets is subset of other buckets, then we can add the values + if (ourBuckets.canAccommodate(otherBuckets)) { + ourBuckets.addValues(values, otherBuckets, other) + // since we are making the exp histogram buckets cumulative during + // ingestion, we can assume cumulative bucket values + false + } else { + val newBuckets = ourBuckets.add(otherBuckets) // create new buckets that can accommodate both + val newValues = new Array[Double](newBuckets.numBuckets) // new values array + newBuckets.addValues(newValues, ourBuckets, this) + newBuckets.addValues(newValues, otherBuckets, other) + buckets = newBuckets + values = newValues + // since we are making the exp histogram buckets cumulative during + // ingestion, we can assume cumulative bucket values + false + } + } + else { cforRange { 0 until numBuckets } { b => values(b) = Double.NaN } @@ -310,7 +444,7 @@ object MutableHistogram { * UPDATE: Adding support for min value as well to make sure the quantile is never below the minimum specified value. * By default, the minimum value is set to 0.0 */ -final case class MaxMinHistogram(innerHist: MutableHistogram, max: Double, min: Double = 0.0) +final case class MaxMinHistogram(innerHist: HistogramWithBuckets, max: Double, min: Double = 0.0) extends HistogramWithBuckets { final def buckets: HistogramBuckets = innerHist.buckets final def bucketValue(no: Int): Double = innerHist.bucketValue(no) @@ -343,6 +477,8 @@ final case class MaxMinHistogram(innerHist: MutableHistogram, max: Double, min: } result } + + } /** @@ -410,6 +546,7 @@ object HistogramBuckets { def apply(buffer: DirectBuffer, formatCode: Byte): HistogramBuckets = formatCode match { case HistFormat_Geometric_Delta => geometric(buffer.byteArray, buffer.addressOffset + 2, false) case HistFormat_Geometric1_Delta => geometric(buffer.byteArray, buffer.addressOffset + 2, true) + case HistFormat_OtelExp_Delta => otelExp(buffer.byteArray, buffer.addressOffset + 2) case HistFormat_Custom_Delta => custom(buffer.byteArray, buffer.addressOffset) case _ => emptyBuckets } @@ -418,6 +555,7 @@ object HistogramBuckets { def apply(acc: MemoryReader, bucketsDef: Ptr.U8, formatCode: Byte): HistogramBuckets = formatCode match { case HistFormat_Geometric_Delta => geometric(acc.base, acc.baseOffset + bucketsDef.add(2).addr, false) case HistFormat_Geometric1_Delta => geometric(acc.base, acc.baseOffset + bucketsDef.add(2).addr, true) + case HistFormat_OtelExp_Delta => otelExp(acc.base, acc.baseOffset + bucketsDef.add(2).addr) case HistFormat_Custom_Delta => custom(acc.base, acc.baseOffset + bucketsDef.addr) case _ => emptyBuckets } @@ -430,6 +568,16 @@ object HistogramBuckets { UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetNumBuckets).toInt, minusOne) + def otelExp(bucketsDefBase: Array[Byte], bucketsDefOffset: Long): HistogramBuckets = { + val scale = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails) + val startPosBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 2) + val numPosBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 4) + // uncomment when negative buckets are supported + // val startNegBucket = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 6) + // val numNegBuckets = UnsafeUtils.getShort(bucketsDefBase, bucketsDefOffset + OffsetBucketDetails + 8) + Base2ExpHistogramBuckets(scale, startPosBucket, numPosBuckets) + } + /** * Creates a CustomBuckets definition. * @param bucketsDefOffset must point to the 2-byte length prefix of the bucket definition @@ -485,6 +633,205 @@ final case class GeometricBuckets(firstBucket: Double, } } +object Base2ExpHistogramBuckets { + // TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here + val maxBuckets = 200 + val maxAbsScale = 100 + val maxAbsBucketIndex = 500 +} + +/** + * Open Telemetry Base2 Exponential Histogram Bucket Scheme. + * See https://github.com/open-telemetry/opentelemetry-proto/blob/7312bdf63218acf27fe96430b7231de37fd091f2/ + * opentelemetry/proto/metrics/v1/metrics.proto#L500 + * for the definition. + * + * Easier explanation here: https://dyladan.me/histograms/2023/05/04/exponential-histograms/ + * + * The bucket scheme is based on the formula: + * `bucket(index) = base ^ (index + 1)` + * where index is the bucket index, and `base = 2 ^ 2 ^ -scale` + * + * Negative observations are not supported yet. But serialization format is forward compatible. + * + * @param scale OTel metrics ExponentialHistogramDataPoint proto scale value + * @param startIndexPositiveBuckets offset for positive buckets from the same proto + * @param numPositiveBuckets length of the positive buckets array from the same proto + */ +final case class Base2ExpHistogramBuckets(scale: Int, + startIndexPositiveBuckets: Int, + numPositiveBuckets: Int + ) extends HistogramBuckets { + import Base2ExpHistogramBuckets._ + require(numPositiveBuckets <= maxBuckets && numPositiveBuckets >= 0, + s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${maxBuckets}") + // limit bucket index and scale values since the corresponding bucketTop values can get very large or very small and + // may tend to very very large or small values above double precision. Note we are also serializing + // these as shorts in the binary format, so they need to be within limits of short + require(startIndexPositiveBuckets > -maxAbsBucketIndex && startIndexPositiveBuckets < maxAbsBucketIndex, + s"Invalid startIndexPositiveBuckets $startIndexPositiveBuckets should be between " + + s"${-maxAbsBucketIndex} and ${maxAbsBucketIndex}") + require(scale > -maxAbsScale && scale < maxAbsScale, + s"Invalid scale $scale should be between ${-maxAbsScale} and ${maxAbsScale}") + + val base: Double = Math.pow(2, Math.pow(2, -scale)) + val startBucketTop: Double = bucketTop(1) + val endBucketTop: Double = bucketTop(numBuckets - 1) + + override def numBuckets: Int = numPositiveBuckets + 1 // add one for zero count + + final def bucketTop(no: Int): Double = { + if (no ==0) 0.0 else { + // From OTel metrics proto docs: + // The histogram bucket identified by `index`, a signed integer, + // contains values that are greater than (base^index) and + // less than or equal to (base^(index+1)). + val index = startIndexPositiveBuckets + no - 1 + Math.pow(base, index + 1) + } + } + + final def serialize(buf: MutableDirectBuffer, pos: Int): Int = { + + // Negative observations are not supported yet, so startIndexNegativeBuckets and numNegativeBuckets are always 0. + // They are here since the proto has it (but the client SDKs do not), and we want to be able to support it + // when the spec changes + val startIndexNegativeBuckets = 0 + val numNegativeBuckets = 0 + require(numBuckets < Short.MaxValue, s"numBucket overflow: $numBuckets") + require(startIndexPositiveBuckets < Short.MaxValue, + s"startIndexPositiveBuckets overflow: $startIndexPositiveBuckets") + require(scale < Short.MaxValue, s"scale overflow: $scale") + require(numPositiveBuckets < Short.MaxValue, s"numPositiveBuckets overflow $numPositiveBuckets") + // per BinHistogram format, bucket def len comes first always + buf.putShort(pos, (2 + 2 + 4 + 4).toShort) + // numBuckets comes next always + val numBucketsPos = pos + 2 + buf.putShort(numBucketsPos, numBuckets.toShort, LITTLE_ENDIAN) + // now bucket format specific data + val bucketSchemeFieldsPos = pos + 4 + buf.putShort(bucketSchemeFieldsPos, scale.toShort, LITTLE_ENDIAN) + buf.putShort(bucketSchemeFieldsPos + 2, startIndexPositiveBuckets.toShort, LITTLE_ENDIAN) + buf.putShort(bucketSchemeFieldsPos + 4, numPositiveBuckets.toShort, LITTLE_ENDIAN) + buf.putShort(bucketSchemeFieldsPos + 6, startIndexNegativeBuckets.toShort, LITTLE_ENDIAN) + buf.putShort(bucketSchemeFieldsPos + 8, numNegativeBuckets.toShort, LITTLE_ENDIAN) + pos + 14 + } + + override def toString: String = { + s"OTelExpHistogramBuckets(scale=$scale, startIndexPositiveBuckets=$startIndexPositiveBuckets, " + + s"numPositiveBuckets=$numPositiveBuckets) ${super.toString}" + } + override def similarForMath(other: HistogramBuckets): Boolean = { + other match { + case c: Base2ExpHistogramBuckets => this.scale == c.scale && + this.startIndexPositiveBuckets == c.startIndexPositiveBuckets && + this.numPositiveBuckets == c.numPositiveBuckets + case _ => false + } + } + + def canAccommodate(other: Base2ExpHistogramBuckets): Boolean = { + // Can we do better? There can be double's == problems here + endBucketTop >= other.endBucketTop && startBucketTop <= other.startBucketTop + } + + def add(o: Base2ExpHistogramBuckets, + maxBuckets: Int = Base2ExpHistogramBuckets.maxBuckets): Base2ExpHistogramBuckets = { + if (canAccommodate(o)) { + this + } else { + // calculate new bucket scheme that can accommodate both bucket ranges + val minBucketTopNeeded = Math.min(startBucketTop, o.startBucketTop) + val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop) + var newScale = Math.min(scale, o.scale) + var newBase = Math.max(base, o.base) + var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 + var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 + // Even if the two schemes are of same scale, they can have non-overlapping bucket ranges. + // The new bucket scheme should have at most maxBuckets, so keep reducing scale until within limits. + while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) { + newScale -= 1 + newBase = Math.pow(2, Math.pow(2, -newScale)) + newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 + newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 + } + Base2ExpHistogramBuckets(newScale, newBucketIndexStart, newBucketIndexEnd - newBucketIndexStart + 1) + } + } + + /** + * Converts an OTel exponential index to array index. + * For example if startIndexPositiveBuckets = -5 and numPositiveBuckets = 10, then + * -5 will return 1, -4 will return 2, 0 will return 6, 4 will return 10. + * Know that 0 array index is reserved for zero bucket. + */ + def bucketIndexToArrayIndex(index: Int): Int = index - startIndexPositiveBuckets + 1 + + /** + * Add the otherValues using otherBuckets scheme to ourValues. This method assumes (a) ourValues uses "this" + * bucket scheme and (a) this bucket scheme can accommodate otherBuckets. + */ + def addValues(ourValues: Array[Double], + otherBuckets: Base2ExpHistogramBuckets, + otherHistogram: HistogramWithBuckets): Unit = { + // TODO consider removing the require once code is stable + require(ourValues.length == numBuckets) + require(otherHistogram.numBuckets == otherBuckets.numBuckets) + require(canAccommodate(otherBuckets)) + val scaleIncrease = otherBuckets.scale - scale + // for each ourValues, find the bucket index in otherValues and add the value + val fac = Math.pow(2 , scaleIncrease).toInt + ourValues(0) += otherHistogram.bucketValue(0) // add the zero bucket + // For this algorithm we take advantage of the pattern that there is a relationship between scale + // and the bucketIndexPlus1. When scale increases by scaleIncrease, + // then the otherBucketIndexPlus1 for the same bucket in the other scheme + // is 2^scaleIncrease (which is fac) times otherBucketIndexPlus1. + cforRange { startIndexPositiveBuckets until startIndexPositiveBuckets + numPositiveBuckets } { ourBucketIndex => + val ourBucketIndexPlus1 = ourBucketIndex + 1 + val otherBucketIndexPlus1 = ourBucketIndexPlus1 * fac + val ourArrayIndex = bucketIndexToArrayIndex(ourBucketIndexPlus1 - 1) + // ourArrayIndex is guaranteed to be within bounds since we are iterating our bucket range + + val otherArrayIndex = otherBuckets.bucketIndexToArrayIndex(otherBucketIndexPlus1 - 1) + // there is a possibility that otherArrayIndex is out of bounds, so we need to check + + if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) { // add if within bounds + + // Example 1: add every 2nd bucket from other to our bucket + // other scale = 2 . . . . . . . . . . . + // ↓ ↓ ↓ ↓ ↓ ↓ + // our scale = 1 . . . . . . + // + // Example 2: add every 4th bucket from other to our bucket + // other scale = 3 . . . . . . . . . . . . . + // ↓ ↓ ↓ ↓ + // our scale = 1 . . . . + + // TODO consider removing the require once code is stable + require(ourArrayIndex != 0, "double counting zero bucket") + require( Math.abs(bucketTop(ourArrayIndex) - otherBuckets.bucketTop(otherArrayIndex)) <= 0.000001, + s"Bucket tops of $ourArrayIndex and $otherArrayIndex don't match:" + + s" ${bucketTop(ourArrayIndex)} != ${otherBuckets.bucketTop(otherArrayIndex)}") + ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherArrayIndex) + } else if (otherArrayIndex >= otherBuckets.numBuckets) { + // if otherArrayIndex higher than upper bound, add to our last bucket since bucket counts are cumulative + + // Example 3: add every 4th bucket from other to our + // other scale = 4 . . . . . . . . . . . . X add last bucket X here + // ↓ ↓ ↓ ↓ ↓ ↓ + // our scale = 2 . . . . . . + + // TODO consider removing the require once code is stable + require(ourArrayIndex != 0, "double counting zero bucket") + ourValues(ourArrayIndex) += otherHistogram.bucketValue(otherBuckets.numBuckets - 1) + } + // if otherArrayIndex is less than lower bound, the counts are already included and we don't need + // to add since the counts are cumulative + } + } +} + /** * A bucketing scheme with custom bucket/LE values. * diff --git a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala index d9ca52558c..034df15cbb 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala @@ -23,6 +23,7 @@ import filodb.memory.format.MemoryReader._ * 0x03 geometric + NibblePacked delta Long values * 0x04 geometric_1 + NibblePacked delta Long values (see [[HistogramBuckets]]) * 0x05 custom LE/bucket values + NibblePacked delta Long values + * 0x09 otelExp_Delta + NibblePacked delta Long values (see [[Base2ExpHistogramBuckets]]) * * +0003 u16 2-byte length of Histogram bucket definition * +0005 [u8] Histogram bucket definition, see [[HistogramBuckets]] @@ -63,6 +64,9 @@ object BinaryHistogram extends StrictLogging { case HistFormat_Geometric1_Delta => val bucketDef = HistogramBuckets.geometric(buf.byteArray, bucketDefOffset, true) LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty) + case HistFormat_OtelExp_Delta => + val bucketDef = HistogramBuckets.otelExp(buf.byteArray, bucketDefOffset) + LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty) case HistFormat_Custom_Delta => val bucketDef = HistogramBuckets.custom(buf.byteArray, bucketDefOffset - 2) LongHistogram.fromPacked(bucketDef, valuesByteSlice).getOrElse(Histogram.empty) @@ -102,13 +106,16 @@ object BinaryHistogram extends StrictLogging { val HistFormat_Null = 0x00.toByte val HistFormat_Geometric_Delta = 0x03.toByte val HistFormat_Geometric1_Delta = 0x04.toByte + val HistFormat_OtelExp_Delta = 0x09.toByte val HistFormat_Custom_Delta = 0x05.toByte val HistFormat_Geometric_XOR = 0x08.toByte // Double values XOR compressed val HistFormat_Custom_XOR = 0x0a.toByte - def isValidFormatCode(code: Byte): Boolean = + def isValidFormatCode(code: Byte): Boolean = { (code == HistFormat_Null) || (code == HistFormat_Geometric1_Delta) || (code == HistFormat_Geometric_Delta) || - (code == HistFormat_Custom_Delta) + (code == HistFormat_Custom_Delta) || (code == HistFormat_OtelExp_Delta) + // Question: why are other formats like HistFormat_Geometric_XOR not here as valid ? + } /** * Writes binary histogram with geometric bucket definition and data which is non-increasing, but will be @@ -143,11 +150,12 @@ object BinaryHistogram extends StrictLogging { * @return the number of bytes written, including the length prefix */ def writeDelta(buckets: HistogramBuckets, values: Array[Long], buf: MutableDirectBuffer): Int = { - require(buckets.numBuckets == values.size, s"Values array size of ${values.size} != ${buckets.numBuckets}") + require(buckets.numBuckets == values.length, s"Values array size of ${values.length} != ${buckets.numBuckets}") val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match { case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta case g: GeometricBuckets => HistFormat_Geometric_Delta case c: CustomBuckets => HistFormat_Custom_Delta + case o: Base2ExpHistogramBuckets => HistFormat_OtelExp_Delta } buf.putByte(2, formatCode) @@ -172,6 +180,7 @@ object BinaryHistogram extends StrictLogging { val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match { case g: GeometricBuckets => HistFormat_Geometric_XOR case c: CustomBuckets => HistFormat_Custom_XOR + case o: Base2ExpHistogramBuckets => HistFormat_OtelExp_Delta } buf.putByte(2, formatCode) @@ -605,6 +614,7 @@ class SectDeltaHistogramReader(acc2: MemoryReader, histVect: Ptr.U8) def detectDropAndCorrection(accNotUsed: MemoryReader, vectorNotUsed: BinaryVectorPtr, meta: CorrectionMeta): CorrectionMeta = meta match { + // TODO deal with exponential histogram counter correction case NoCorrection => meta // No last value, cannot compare. Just pass it on. case h @ HistogramCorrection(lastValue, correction) => val firstValue = apply(0) diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala index 85cc5e798e..3aebdf8e05 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramTest.scala @@ -25,6 +25,11 @@ object HistogramTest { LongHistogram(customScheme, buckets.take(customScheme.numBuckets).map(_.toLong)) } + val otelExpBuckets = Base2ExpHistogramBuckets(3, -3, 7) + val otelExpHistograms = rawHistBuckets.map { buckets => + LongHistogram(otelExpBuckets, buckets.take(otelExpBuckets.numBuckets).map(_.toLong)) + } + val correction1 = LongHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8)) val correction2 = LongHistogram(bucketScheme, Array(2, 4, 6, 8, 10, 12, 14, 18)) @@ -58,38 +63,109 @@ class HistogramTest extends NativeVectorTest { customScheme.serialize(writeBuf, 0) shouldEqual 26 HistogramBuckets(writeBuf, HistFormat_Custom_Delta) shouldEqual customScheme + + val buckets3 = Base2ExpHistogramBuckets(3, -5, 16) + buckets3.serialize(writeBuf, 0) shouldEqual 14 + HistogramBuckets(writeBuf, HistFormat_OtelExp_Delta) shouldEqual buckets3 } } describe("Histogram") { - it("should add two histograms with the same bucket scheme correctly") { - val hist1 = LongHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8)) - val hist2 = LongHistogram(bucketScheme, Array(2, 4, 6, 8, 10, 12, 14, 18)) - hist1.add(hist2) - hist1.values shouldEqual Array(3, 6, 9, 12, 15, 18, 21, 26) - } + it("should add two histograms with the same bucket scheme correctly") { + val hist1 = LongHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8)) + val hist2 = LongHistogram(bucketScheme, Array(2, 4, 6, 8, 10, 12, 14, 18)) + hist1.add(hist2) - it("should not add histograms with different bucket schemes") { - val hist1 = LongHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8)) - val histWithDifferentBuckets = LongHistogram(customScheme, Array(1, 2, 3, 4, 5, 6, 7)) + hist1.values shouldEqual Array(3, 6, 9, 12, 15, 18, 21, 26) + } - val thrown = intercept[IllegalArgumentException] { - hist1.add(histWithDifferentBuckets) - } + it("should not add histograms with different bucket schemes") { + val hist1 = LongHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8)) + val histWithDifferentBuckets = LongHistogram(customScheme, Array(1, 2, 3, 4, 5, 6, 7)) - thrown.getMessage shouldEqual s"Mismatch in bucket sizes. Cannot add histograms with different bucket configurations. " + - s"Expected: ${hist1.buckets}, Found: ${histWithDifferentBuckets.buckets}" + val thrown = intercept[IllegalArgumentException] { + hist1.add(histWithDifferentBuckets) } - it("should calculate quantile correctly") { + + thrown.getMessage shouldEqual s"Mismatch in bucket sizes. Cannot add histograms with different bucket configurations. " + + s"Expected: ${hist1.buckets}, Found: ${histWithDifferentBuckets.buckets}" + } + + it("should calculate quantile correctly for custom and geometric bucket histograms") { mutableHistograms.zip(quantile50Result).foreach { case (h, res) => val quantile = h.quantile(0.50) info(s"For histogram ${h.values.toList} -> quantile = $quantile") quantile shouldEqual res } - // Cannot return anything more than 2nd-to-last bucket (ie 64) - mutableHistograms(0).quantile(0.95) shouldEqual 64 + // Cannot return anything more than 2nd-to-last bucket (ie 64) when last bucket is infinity + customHistograms(0).quantile(0.95) shouldEqual 10 + } + + + it("should calculate quantile correctly for exponential bucket histograms") { + val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 + val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) + + // multiple buckets with interpolation + hist.quantile(0.5) shouldEqual 1.0 +- 0.00001 + hist.quantile(0.75) shouldEqual 1.2968395546510099 +- 0.00001 + hist.quantile(0.25) shouldEqual 0.7711054127039704 +- 0.00001 + hist.quantile(0.99) shouldEqual 1.6643974694230492 +- 0.00001 + hist.quantile(0.01) shouldEqual 0.0 // zero bucket + hist.quantile(0.085) shouldEqual 0.014142135623730961 +- 0.00001 + } + + it("should calculate histogram_fraction correctly for exponential histograms using exponential interpolation") { + val bucketScheme = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 + val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) + + // multiple buckets with interpolation + hist.histogramFraction(0.8, 1.2) shouldEqual 0.3899750004807707 +- 0.00001 + + // multiple buckets with interpolation using min and max leads to improved accuracy + hist.histogramFraction(0.8, 1.2, 0.76, 1.21) shouldEqual 0.42472117149283567 +- 0.00001 + + // multiple buckets without interpolation + hist.histogramFraction(bucketScheme.bucketTop(3), + bucketScheme.bucketTop(7)) shouldEqual ((hist.bucketValue(7) - hist.bucketValue(3)) / hist.topBucketValue) +- 0.00001 + + // zero bucket + hist.histogramFraction(0, 0) shouldEqual 0.0833333 +- 0.00001 + + // beyond last bucket + hist.histogramFraction(2.0, 2.1) shouldEqual 0.0 + + // one bucket + hist.histogramFraction(1.0, 1.09) shouldEqual 0.08288542333480116 +- 0.00001 + + // all buckets + hist.histogramFraction(0, 2) shouldEqual 1.0 + } + + it("should calculate histogram_fraction correctly for custom bucket histograms using linear interpolation") { + val bucketScheme = CustomBuckets(Array(1,2,3,4,5,6,7,8,9,10,11,Double.PositiveInfinity)) + val hist = MutableHistogram(bucketScheme, Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) + + // multiple buckets with linear interpolation + hist.histogramFraction(4.5, 6.6) shouldEqual 0.175 +- 0.00001 + + // multiple buckets with interpolation using min and max leads to improved accuracy + hist.histogramFraction(4.5, 6.6, 4.1, 6.8) shouldEqual 0.19212962962962962 +- 0.00001 + + // multiple buckets without interpolation + hist.histogramFraction(bucketScheme.bucketTop(3), + bucketScheme.bucketTop(7)) shouldEqual ((hist.bucketValue(7) - hist.bucketValue(3)) / hist.topBucketValue) +- 0.00001 + + // beyond last bucket + hist.histogramFraction(11.1, 12.1) shouldEqual 0.0 + + // one bucket + hist.histogramFraction(1.0, 1.09) shouldEqual 0.0075 +- 0.00001 + + // all buckets + hist.histogramFraction(0, Double.PositiveInfinity) shouldEqual 1.0 } it("should calculate more accurate quantile with MaxMinHistogram using max column") { @@ -215,5 +291,91 @@ class HistogramTest extends NativeVectorTest { current = d } } + + it("should return correct values for Base2ExpHistogramBuckets.bucketIndexToArrayIndex(index: Int): Int") { + val b1 = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 + b1.bucketIndexToArrayIndex(-5) shouldEqual 1 + b1.bucketIndexToArrayIndex(-4) shouldEqual 2 + b1.bucketIndexToArrayIndex(0) shouldEqual 6 + b1.bucketIndexToArrayIndex(5) shouldEqual 11 + } + + it("should create OTel exponential buckets and add them correctly") { + val b1 = Base2ExpHistogramBuckets(3, -5, 11) // 0.707 to 1.68 + b1.numBuckets shouldEqual 12 + b1.startBucketTop shouldBe 0.7071067811865475 +- 0.0001 + b1.endBucketTop shouldBe 1.6817928305074294 +- 0.0001 + + b1.bucketIndexToArrayIndex(-5) shouldEqual 1 + b1.bucketIndexToArrayIndex(5) shouldEqual 11 + b1.bucketTop(b1.bucketIndexToArrayIndex(-1)) shouldEqual 1.0 + + val b2 = Base2ExpHistogramBuckets(2, -2, 7) // 0.8408 to 2.378 + val b3 = Base2ExpHistogramBuckets(2, -4, 9) // 0.594 to 2.378 + b1.canAccommodate(b2) shouldEqual false + b2.canAccommodate(b1) shouldEqual false + b3.canAccommodate(b1) shouldEqual true + b3.canAccommodate(b2) shouldEqual true + + val bAdd = b1.add(b2) + bAdd.numBuckets shouldEqual 10 + bAdd.scale shouldEqual 2 + bAdd.startBucketTop shouldBe 0.5946035575013606 +- 0.0001 + bAdd.endBucketTop shouldBe 2.378414245732675 +- 0.0001 + bAdd.canAccommodate(b1) shouldEqual true + bAdd.canAccommodate(b2) shouldEqual true + + val bAddValues = new Array[Double](bAdd.numBuckets) + bAdd.addValues(bAddValues, b1, MutableHistogram(b1, Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))) + bAddValues.toSeq shouldEqual Seq(0.0, 0.0, 1.0, 3.0, 5.0, 7.0, 9.0, 11.0, 11.0, 11.0) + bAdd.addValues(bAddValues, b2, MutableHistogram(b2, Array(0, 1, 2, 3, 4, 5, 6, 7))) + bAddValues.toSeq shouldEqual Seq(0.0, 0.0, 1.0, 4.0, 7.0, 10.0, 13.0, 16.0, 17.0, 18.0) + + val b4 = Base2ExpHistogramBuckets(5, 15, 36) // 1.414 to 3.018 + b4.numBuckets shouldEqual 37 + b4.startBucketTop shouldBe 1.414213562373094 +- 0.0001 + b4.endBucketTop shouldBe 3.0183288551868377 +- 0.0001 + + val b5 = Base2ExpHistogramBuckets(3, 10, 6) + b5.numBuckets shouldEqual 7 + b5.startBucketTop shouldBe 2.59367910930202 +- 0.0001 + b5.endBucketTop shouldBe 4.000000000000002 +- 0.0001 + + val bAdd2 = bAdd.add(b4).add(b5) + val bAdd2Values = new Array[Double](bAdd2.numBuckets) + bAdd2.addValues(bAdd2Values, bAdd, MutableHistogram(bAdd, bAddValues)) + bAdd2Values.toSeq shouldEqual Seq(0.0, 0.0, 1.0, 4.0, 7.0, 10.0, 13.0, 16.0, 17.0, 18.0, 18.0, 18.0, 18.0, 18.0) + + bAdd2.addValues(bAdd2Values, b4, MutableHistogram(b4, (0 until 37 map (i => i.toDouble)).toArray)) + bAdd2Values.toSeq shouldEqual Seq(0.0, 0.0, 1.0, 4.0, 7.0, 10.0, 14.0, 25.0, 34.0, 43.0, 51.0, 54.0, 54.0, 54.0) + + bAdd2.addValues(bAdd2Values, b5, MutableHistogram(b5, Array(0.0, 10.0, 11, 12, 13, 14, 15))) + bAdd2Values.toSeq shouldEqual Seq(0.0, 0.0, 1.0, 4.0, 7.0, 10.0, 14.0, 25.0, 34.0, 43.0, 62.0, 67.0, 69.0, 69.0) + + } + + it("should create non-overlapping OTel exponential buckets and add them correctly") { + val b1 = Base2ExpHistogramBuckets(3, -5, 11) + val b2 = Base2ExpHistogramBuckets(3, 15, 11) + val b3 = b1.add(b2) + b3.numBuckets shouldEqual 32 + b3.startIndexPositiveBuckets shouldEqual -5 + } + + it("should reduce scale when more than 120 buckets to keep memory and compute in check") { + val b1 = Base2ExpHistogramBuckets(6, -50, 21) + val b2 = Base2ExpHistogramBuckets(6, 100, 26) + val add1 = b1.add(b2, maxBuckets = 128) + add1 shouldEqual Base2ExpHistogramBuckets(5, -26, 90) + add1.canAccommodate(b1) shouldEqual true + add1.canAccommodate(b2) shouldEqual true + + val add2 = b1.add(b2, maxBuckets = 64) + add2 shouldEqual Base2ExpHistogramBuckets(4, -14, 46) + add2.canAccommodate(b1) shouldEqual true + add2.canAccommodate(b2) shouldEqual true + } + + } } \ No newline at end of file diff --git a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala index 5d3c34f77c..58ab343f25 100644 --- a/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala +++ b/memory/src/test/scala/filodb.memory/format/vectors/HistogramVectorTest.scala @@ -1,11 +1,9 @@ package filodb.memory.format.vectors import java.nio.ByteBuffer - import org.agrona.concurrent.UnsafeBuffer - import filodb.memory.format._ -import filodb.memory.format.vectors.BinaryHistogram.{BinHistogram, HistFormat_Geometric1_Delta} +import filodb.memory.format.vectors.BinaryHistogram.{BinHistogram, HistFormat_Geometric1_Delta, HistFormat_OtelExp_Delta} class HistogramVectorTest extends NativeVectorTest { import HistogramTest._ @@ -72,6 +70,54 @@ class HistogramVectorTest extends NativeVectorTest { } } + it("should accept LongHistograms with otel exp scheme and query them back") { + val appender = HistogramVector.appending(memFactory, 1024) + otelExpHistograms.foreach { custHist => + custHist.serialize(Some(buffer)) + appender.addData(buffer) shouldEqual Ack + } + + appender.length shouldEqual otelExpHistograms.length + + val reader = appender.reader.asInstanceOf[RowHistogramReader] + reader.length shouldEqual otelExpHistograms.length + + (0 until otelExpHistograms.length).foreach { i => + reader(i) shouldEqual otelExpHistograms(i) + } + } + + it("should accept MutableHistograms with otel exp scheme and query them back") { + val appender = HistogramVector.appending(memFactory, 1024) + val mutHistograms = otelExpHistograms.map(h => MutableHistogram(h.buckets, h.valueArray)) + mutHistograms.foreach { custHist => + custHist.serialize(Some(buffer)) + appender.addData(buffer) shouldEqual Ack + } + appender.length shouldEqual mutHistograms.length + val reader = appender.reader.asInstanceOf[RowHistogramReader] + reader.length shouldEqual mutHistograms.length + + (0 until otelExpHistograms.length).foreach { i => + val h = reader(i) + h.buckets shouldEqual otelExpHistograms.head.buckets + h shouldEqual mutHistograms(i) + } + } + + it("Bin Histogram from otel exponential histogram should go through serialize and query cycle correctly") { + val appender = HistogramVector.appending(memFactory, 1024) + val otelExpBuckets = Base2ExpHistogramBuckets(3, -20, 41) + val h = MutableHistogram(otelExpBuckets, Array.fill(42)(1L)) + h.serialize(Some(buffer)) + appender.addData(buffer) shouldEqual Ack + val mutableHisto = appender.reader.asHistReader.sum(0,0) + val binHist = BinHistogram(mutableHisto.serialize()) + binHist.formatCode shouldEqual HistFormat_OtelExp_Delta + val deserHist = binHist.toHistogram + deserHist shouldEqual h + } + it("should optimize histograms and be able to query optimized vectors") { val appender = HistogramVector.appending(memFactory, 1024) rawLongBuckets.foreach { rawBuckets => diff --git a/query/src/main/scala/filodb/query/PlanEnums.scala b/query/src/main/scala/filodb/query/PlanEnums.scala index 99d3300821..9333761b0b 100644 --- a/query/src/main/scala/filodb/query/PlanEnums.scala +++ b/query/src/main/scala/filodb/query/PlanEnums.scala @@ -15,6 +15,7 @@ object InstantFunctionId extends Enum[InstantFunctionId] { case object Exp extends InstantFunctionId("exp") case object Floor extends InstantFunctionId("floor") case object HistogramQuantile extends InstantFunctionId("histogram_quantile") + case object HistogramFraction extends InstantFunctionId("histogram_fraction") case object HistogramMaxQuantile extends InstantFunctionId("histogram_max_quantile") case object HistogramBucket extends InstantFunctionId("histogram_bucket") case object Ln extends InstantFunctionId("ln") diff --git a/query/src/main/scala/filodb/query/exec/HistogramQuantileMapper.scala b/query/src/main/scala/filodb/query/exec/HistogramQuantileMapper.scala index e8020260b0..eb343f6f4c 100644 --- a/query/src/main/scala/filodb/query/exec/HistogramQuantileMapper.scala +++ b/query/src/main/scala/filodb/query/exec/HistogramQuantileMapper.scala @@ -115,6 +115,7 @@ final case class HistogramQuantileMapper(funcParams: Seq[FuncArgs]) extends Rang final def bucketTop(no: Int): Double = buckets(no).le final def bucketValue(no: Int): Double = buckets(no).rate final def serialize(intoBuf: Option[MutableDirectBuffer] = None): MutableDirectBuffer = ??? + final def hasExponentialBuckets: Boolean = false } /** diff --git a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala index 57f45eb861..76dfaeb0ad 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/InstantFunction.scala @@ -114,6 +114,7 @@ object InstantFunction { if (sourceSchema.isHistMaxMin) HistogramQuantileWithMaxMinImpl() else HistogramQuantileImpl() case HistogramMaxQuantile => HistogramMaxQuantileImpl() case HistogramBucket => HistogramBucketImpl() + case HistogramFraction => HistogramFractionImpl() case _ => throw new UnsupportedOperationException(s"$function not supported.") } } @@ -364,6 +365,15 @@ final case class HistogramQuantileImpl() extends HistToDoubleIFunction { } } +final case class HistogramFractionImpl() extends HistToDoubleIFunction { + final def apply(value: Histogram, scalarParams: Seq[Double]): Double = { + require(scalarParams.length == 2, "Need two params for histogram fraction function") + val lower = scalarParams(0) + val upper = scalarParams(1) + value.histogramFraction(lower, upper) + } +} + /** * Histogram quantile function for Histogram columns, where all buckets are together. This will take in consideration * of min and max columns diff --git a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala index 2d156307a9..84a6868191 100644 --- a/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala +++ b/query/src/main/scala/filodb/query/exec/rangefn/RateFunctions.scala @@ -5,6 +5,7 @@ import spire.syntax.cfor._ import filodb.core.query.{QueryConfig, TransientHistRow, TransientRow} import filodb.memory.format.{vectors => bv, BinaryVector, CounterVectorReader, MemoryReader, VectorDataReader} import filodb.memory.format.BinaryVector.BinaryVectorPtr +import filodb.memory.format.vectors.Base2ExpHistogramBuckets import filodb.query.exec.FiloQueryConfig object RateFunctions { @@ -307,6 +308,18 @@ abstract class HistogramRateFunctionBase extends CounterChunkedRangeFunction[Tra isCounter, isRate) } sampleToEmit.setValues(windowEnd, bv.MutableHistogram(lowestValue.buckets, rateArray)) + } else if (highestValue.buckets.isInstanceOf[Base2ExpHistogramBuckets] && + lowestValue.buckets.isInstanceOf[Base2ExpHistogramBuckets]) { + // Assume highestValue.buckets.scale <= lowestValue.buckets.scale since client always + // reduces scale over time. This is confirmed in java otel sdk. + val hvb = highestValue.buckets.asInstanceOf[Base2ExpHistogramBuckets] + val lvb = lowestValue.buckets.asInstanceOf[Base2ExpHistogramBuckets] + if (hvb.canAccommodate(lvb)) { + // TODO then handle rate calculation for different bucket scheme (due to difference in scale or buckets) + ??? + } else { + sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty) + } } else { sampleToEmit.setValues(windowEnd, bv.HistogramWithBuckets.empty) } diff --git a/run_benchmarks.sh b/run_benchmarks.sh index 30acb54659..458898ca25 100755 --- a/run_benchmarks.sh +++ b/run_benchmarks.sh @@ -1,6 +1,7 @@ #!/bin/bash sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -XX:MaxInlineLevel=20 \ - -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 \ + -jvmArgsAppend -Xmx4g -jvmArgsAppend -XX:MaxInlineSize=99 -jvmArgsAppend -Dkamon.enabled=false \ + filodb.jmh.Base2ExponentialHistogramQueryBenchmark \ filodb.jmh.QueryHiCardInMemoryBenchmark \ filodb.jmh.QueryInMemoryBenchmark \ filodb.jmh.QueryAndIngestBenchmark \ @@ -9,3 +10,5 @@ sbt -Drust.optimize=true "jmh/jmh:run -rf json -i 5 -wi 3 -f 1 -jvmArgsAppend -X filodb.jmh.GatewayBenchmark \ filodb.jmh.PartKeyLuceneIndexBenchmark \ filodb.jmh.PartKeyTantivyIndexBenchmark" + +# -prof 'async:libPath=/path/to/async-profiler-3.0-macos/lib/libasyncProfiler.dylib;event=cpu;output=flamegraphdir=./profile-results' \ \ No newline at end of file diff --git a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala index cd2e1d5dc6..2b10c9ac61 100644 --- a/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala +++ b/spark-jobs/src/test/scala/filodb/downsampler/DownsamplerMainSpec.scala @@ -15,7 +15,7 @@ import filodb.core.{DatasetRef, MachineMetricsData} import filodb.downsampler.chunk.{BatchDownsampler, BatchExporter, Downsampler, DownsamplerSettings} import filodb.downsampler.index.{DSIndexJobSettings, IndexJobDriver} import filodb.memory.format.ZeroCopyUTF8String._ -import filodb.memory.format.vectors.{CustomBuckets, DoubleVector, LongHistogram} +import filodb.memory.format.vectors.{CustomBuckets, DoubleVector, LongHistogram, Base2ExpHistogramBuckets} import filodb.memory.format.{PrimitiveVectorReader, UnsafeUtils} import filodb.query.exec._ import filodb.query.{QueryError, QueryResult} @@ -34,7 +34,6 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.time.{Millis, Seconds, Span} import org.apache.spark.sql.types._ - import java.io.File import java.time import java.time.Instant @@ -133,7 +132,9 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl var histNaNPartKeyBytes: Array[Byte] = _ val deltaHistName = "my_delta_histogram" + val otelExpDeltaHistName = "my_exp_delta_histogram" val deltaHistNameNaN = "my_histogram_NaN" + var expDeltaHistPartKeyBytes: Array[Byte] = _ var deltaHistPartKeyBytes: Array[Byte] = _ var deltaHistNaNPartKeyBytes: Array[Byte] = _ @@ -1088,6 +1089,53 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl rawColStore.writePartKeys(rawDataset.ref, shard, Observable.now(pk), 259200, pkUpdateHour).futureValue } + it("should write otel exponential delta histogram data to cassandra") { + + val rawDataset = Dataset("prometheus", Schemas.deltaHistogram) + + val partBuilder = new RecordBuilder(offheapMem.nativeMemoryManager) + val partKey = partBuilder.partKeyFromObjects(Schemas.deltaHistogram, otelExpDeltaHistName, seriesTags) + + val part = new TimeSeriesPartition(0, Schemas.deltaHistogram, partKey, shardInfo, 1) + + expDeltaHistPartKeyBytes = part.partKeyBytes + + val bucketScheme1 = Base2ExpHistogramBuckets(3, -1, 3) + val bucketScheme2 = Base2ExpHistogramBuckets(2, -1, 3) + val rawSamples = Stream( // time, sum, count, hist, name, tags + Seq(74372801000L, 0d, 1d, LongHistogram(bucketScheme1, Array(0L, 0L, 0, 1)), otelExpDeltaHistName, seriesTags), + Seq(74372801500L, 2d, 2d, LongHistogram(bucketScheme1, Array(0L, 0L, 2, 2)), otelExpDeltaHistName, seriesTags), + Seq(74372802000L, 3d, 3d, LongHistogram(bucketScheme2, Array(0L, 2L, 3, 3)), otelExpDeltaHistName, seriesTags), + + Seq(74372861000L, 4d, 3d, LongHistogram(bucketScheme2, Array(0L, 0L, 0, 3)), otelExpDeltaHistName, seriesTags), + Seq(74372861500L, 1d, 1d, LongHistogram(bucketScheme2, Array(0L, 0L, 0, 1)), otelExpDeltaHistName, seriesTags), + Seq(74372862000L, 1d, 4d, LongHistogram(bucketScheme2, Array(0L, 0L, 3, 4)), otelExpDeltaHistName, seriesTags), + + Seq(74372921000L, 2d, 2d, LongHistogram(bucketScheme1, Array(0L, 0L, 0, 2)), otelExpDeltaHistName, seriesTags), + Seq(74372921500L, 5d, 7d, LongHistogram(bucketScheme1, Array(0L, 1L, 1, 7)), otelExpDeltaHistName, seriesTags), + Seq(74372922000L, 8d, 10d, LongHistogram(bucketScheme2, Array(0L, 0L, 8, 10)), otelExpDeltaHistName, seriesTags), + + Seq(74372981000L, 2d, 2d, LongHistogram(bucketScheme2, Array(0L, 1L, 1, 2)), otelExpDeltaHistName, seriesTags), + Seq(74372981500L, 1d, 1d, LongHistogram(bucketScheme2, Array(0L, 0L, 1, 1)), otelExpDeltaHistName, seriesTags), + Seq(74372982000L, 14d, 14d, LongHistogram(bucketScheme2, Array(0L, 0L, 14, 14)), otelExpDeltaHistName, seriesTags), + + Seq(74373041000L, 3d, 4d, LongHistogram(bucketScheme1, Array(0L, 1L, 1, 4)), otelExpDeltaHistName, seriesTags), + Seq(74373042000L, 2d, 6d, LongHistogram(bucketScheme1, Array(0L, 3L, 4, 6)), otelExpDeltaHistName, seriesTags) + ) + + MachineMetricsData.records(rawDataset, rawSamples).records.foreach { case (base, offset) => + val rr = new BinaryRecordRowReader(Schemas.deltaHistogram.ingestionSchema, base, offset) + part.ingest(lastSampleTime, rr, offheapMem.blockMemFactory, createChunkAtFlushBoundary = false, + flushIntervalMillis = Option.empty, acceptDuplicateSamples = false) + } + part.switchBuffers(offheapMem.blockMemFactory, true) + val chunks = part.makeFlushChunks(offheapMem.blockMemFactory) + + rawColStore.write(rawDataset.ref, Observable.fromIteratorUnsafe(chunks)).futureValue + val pk = PartKeyRecord(deltaHistPartKeyBytes, 74372801000L, currTime, shard) + rawColStore.writePartKeys(rawDataset.ref, shard, Observable.now(pk), 259200, pkUpdateHour).futureValue + } + it ("should write prom histogram data with NaNs to cassandra") { val rawDataset = Dataset("prometheus", Schemas.promHistogram) @@ -1498,6 +1546,45 @@ class DownsamplerMainSpec extends AnyFunSpec with Matchers with BeforeAndAfterAl ) } + it("should read and verify exponential delta histogram data in cassandra using " + + "PagedReadablePartition for 1-min downsampled data") { + + val downsampledPartData1 = downsampleColStore.readRawPartitions( + batchDownsampler.downsampleRefsByRes(FiniteDuration(1, "min")), + 0, + SinglePartitionScan(expDeltaHistPartKeyBytes)) + .toListL.runToFuture.futureValue.head + + val downsampledPart1 = new PagedReadablePartition(Schemas.deltaHistogram.downsample.get, 0, 0, + downsampledPartData1, 5.minutes.toMillis.toInt) + + downsampledPart1.partKeyBytes shouldEqual expDeltaHistPartKeyBytes + + val ctrChunkInfo = downsampledPart1.infos(AllChunkScan).nextInfoReader + PrimitiveVectorReader.dropped(ctrChunkInfo.vectorAccessor(2), ctrChunkInfo.vectorAddress(2)) shouldEqual false + + val rv1 = RawDataRangeVector(CustomRangeVectorKey.empty, downsampledPart1, AllChunkScan, Array(0, 1, 2, 3), + new AtomicLong(), Long.MaxValue, "query-id") + + val downsampledData1 = rv1.rows.map { r => + val h = r.getHistogram(3).asInstanceOf[LongHistogram] + val b = h.buckets.asInstanceOf[Base2ExpHistogramBuckets] + (r.getLong(0), r.getDouble(1), r.getDouble(2), b, h.values.toSeq) + }.toList + + // time, sum, count, histogram, bucket values + // new sample for every bucket-scheme and downsamplePeriod combination + downsampledData1 shouldEqual Seq( + (74372801500L,2.0,3.0, Base2ExpHistogramBuckets(3, -1, 3), Seq(0, 0, 2, 3)), + (74372802000L,3.0,3.0,Base2ExpHistogramBuckets(2, -1, 3), Seq(0, 0, 3, 8)), + (74372861750L,6.0,8.0,Base2ExpHistogramBuckets(2, -1, 3), Seq(0, 0, 3, 8)), + (74372921500L,7.0,9.0,Base2ExpHistogramBuckets(3, -1, 3), Seq(0, 1, 1, 9)), + (74372922000L,8.0,10.0,Base2ExpHistogramBuckets(2, -1, 3), Seq(0, 1, 16, 17)), + (74372982000L,17.0,17.0,Base2ExpHistogramBuckets(2, -1, 3), Seq(0, 1, 16, 17)), + (74373042000L,5.0,10.0,Base2ExpHistogramBuckets(3, -1, 3), Seq(0, 4, 5, 10)) + ) + } + it("should read and verify prom histogram data with NaNs in cassandra using " + "PagedReadablePartition for 1-min downsampled data") {