diff --git a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala index 71e6bb4aa6..b4df27a496 100644 --- a/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/NodeClusterActor.scala @@ -55,6 +55,7 @@ object NodeClusterActor { * @param source the IngestionSource on each node. Use noOpSource to not start ingestion and * manually push records into NodeCoordinator. * @param storeConfig a StoreConfig for the MemStore. + * @param overrideSchema if true, will override the schema in the MetaStore with the schema in the Dataset. * @return DatasetVerified - meaning the dataset and columns are valid. Does not mean ingestion is * setup on all nodes - for that, subscribe to ShardMapUpdate's */ diff --git a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala index a19929bd28..d53677d95b 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/Histogram.scala @@ -249,7 +249,9 @@ final case class MutableHistogram(var buckets: HistogramBuckets, // if our buckets is subset of other buckets, then we can add the values if (ourBuckets.canAccommodate(otherBuckets)) { ourBuckets.addValues(values, otherBuckets, other) - true + // since we are making the exp histogram buckets cumulative during + // ingestion, we can assume cumulative bucket values + false } else { val newBuckets = ourBuckets.add(otherBuckets) // create new buckets that can accommodate both val newValues = new Array[Double](newBuckets.numBuckets) // new values array @@ -257,7 +259,9 @@ final case class MutableHistogram(var buckets: HistogramBuckets, newBuckets.addValues(newValues, otherBuckets, other) buckets = newBuckets values = newValues - true + // since we are making the exp histogram buckets cumulative during + // ingestion, we can assume cumulative bucket values + false } } else { @@ -521,6 +525,8 @@ final case class GeometricBuckets(firstBucket: Double, object Base2ExpHistogramBuckets { // TODO: make maxBuckets default configurable; not straightforward to get handle to global config from here val maxBuckets = 200 + val maxAbsScale = 100 + val maxAbsBucketIndex = 500 } /** @@ -548,10 +554,19 @@ final case class Base2ExpHistogramBuckets(scale: Int, startIndexNegativeBuckets: Int = 0, numNegativeBuckets: Int = 0 ) extends HistogramBuckets { - require(numPositiveBuckets <= Base2ExpHistogramBuckets.maxBuckets && numPositiveBuckets >= 0, - s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${Base2ExpHistogramBuckets.maxBuckets}") + import Base2ExpHistogramBuckets._ + require(numPositiveBuckets <= maxBuckets && numPositiveBuckets >= 0, + s"Invalid buckets: numPositiveBuckets=$numPositiveBuckets maxBuckets=${maxBuckets}") require(numNegativeBuckets == 0 && startIndexNegativeBuckets == 0, "Negative buckets not supported yet") - require(startIndexPositiveBuckets > -500 && startIndexPositiveBuckets < 500) + // limit bucket index and scale values since the corresponding bucketTop values can get very large or very small and + // may tend to very very large or small values above double precision. Note we are also serializing + // these as shorts in the binary format, so they need to be within limits of short + require(startIndexPositiveBuckets > -maxAbsBucketIndex && startIndexPositiveBuckets < maxAbsBucketIndex, + s"Invalid startIndexPositiveBuckets $startIndexPositiveBuckets should be between " + + s"${-maxAbsBucketIndex} and ${maxAbsBucketIndex}") + require(scale > -maxAbsScale && scale < maxAbsScale, + s"Invalid scale $scale should be between ${-maxAbsScale} and ${maxAbsScale}") + val base: Double = Math.pow(2, Math.pow(2, -scale)) val startBucketTop: Double = bucketTop(1) val endBucketTop: Double = bucketTop(numBuckets - 1) @@ -572,8 +587,11 @@ final case class Base2ExpHistogramBuckets(scale: Int, } final def serialize(buf: MutableDirectBuffer, pos: Int): Int = { - require(scale < 100 && scale > -100, s"Unsupported scale $scale") - require(numBuckets < 65536, s"Too many buckets: $numBuckets") + require(numBuckets < Short.MaxValue, s"numBucket overflow: $numBuckets") + require(startIndexPositiveBuckets < Short.MaxValue, + s"startIndexPositiveBuckets overflow: $startIndexPositiveBuckets") + require(scale < Short.MaxValue, s"scale overflow: $scale") + require(numPositiveBuckets < Short.MaxValue, s"numPositiveBuckets overflow $numPositiveBuckets") // per BinHistogram format, bucket def len comes first always buf.putShort(pos, (2 + 2 + 4 + 4).toShort) // numBuckets comes next always @@ -612,10 +630,11 @@ final case class Base2ExpHistogramBuckets(scale: Int, if (canAccommodate(o)) { this } else { + // calculate new bucket scheme that can accommodate both bucket ranges val minBucketTopNeeded = Math.min(startBucketTop, o.startBucketTop) val maxBucketTopNeeded = Math.max(endBucketTop, o.endBucketTop) var newScale = Math.min(scale, o.scale) - var newBase = Math.pow(2, Math.pow(2, -newScale)) + var newBase = Math.max(base, o.base) var newBucketIndexEnd = Math.ceil(Math.log(maxBucketTopNeeded) / Math.log(newBase)).toInt - 1 var newBucketIndexStart = Math.floor(Math.log(minBucketTopNeeded) / Math.log(newBase)).toInt - 1 while (newBucketIndexEnd - newBucketIndexStart > maxBuckets) { @@ -642,13 +661,14 @@ final case class Base2ExpHistogramBuckets(scale: Int, // for each ourValues, find the bucket index in otherValues and add the value val fac = Math.pow(2 , otherBuckets.scale - scale).toInt ourValues(0) += otherHistogram.bucketValue(0) // add the zero bucket - cforRange { 0 until ourValues.length - 1 } { i => - val ourBucketIndexPlus1 = startIndexPositiveBuckets + i + 1 + cforRange { startIndexPositiveBuckets until startIndexPositiveBuckets + numPositiveBuckets } { ourBucketIndex => + val ourBucketIndexPlus1 = ourBucketIndex + 1 val otherBucketIndexPlus1 = ourBucketIndexPlus1 * fac val ourArrayIndex = bucketIndexToArrayIndex(ourBucketIndexPlus1 - 1) val otherArrayIndex = otherBuckets.bucketIndexToArrayIndex(otherBucketIndexPlus1 - 1) if (otherArrayIndex > 0 && otherArrayIndex < otherBuckets.numBuckets) { // TODO remove this require once code is stable + require(ourArrayIndex != 0, "double counting zero bucket") require( Math.abs(bucketTop(ourArrayIndex) - otherBuckets.bucketTop(otherArrayIndex)) <= 0.000001, s"Bucket tops of $ourArrayIndex and $otherArrayIndex don't match:" + s" ${bucketTop(ourArrayIndex)} != ${otherBuckets.bucketTop(otherArrayIndex)}") diff --git a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala index b4bbdb30bc..034df15cbb 100644 --- a/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala +++ b/memory/src/main/scala/filodb.memory/format/vectors/HistogramVector.scala @@ -23,7 +23,7 @@ import filodb.memory.format.MemoryReader._ * 0x03 geometric + NibblePacked delta Long values * 0x04 geometric_1 + NibblePacked delta Long values (see [[HistogramBuckets]]) * 0x05 custom LE/bucket values + NibblePacked delta Long values - * 0x09 geometric_2 + NibblePacked delta Long values (see [[Base2ExpHistogramBuckets]]) + * 0x09 otelExp_Delta + NibblePacked delta Long values (see [[Base2ExpHistogramBuckets]]) * * +0003 u16 2-byte length of Histogram bucket definition * +0005 [u8] Histogram bucket definition, see [[HistogramBuckets]] @@ -114,7 +114,7 @@ object BinaryHistogram extends StrictLogging { def isValidFormatCode(code: Byte): Boolean = { (code == HistFormat_Null) || (code == HistFormat_Geometric1_Delta) || (code == HistFormat_Geometric_Delta) || (code == HistFormat_Custom_Delta) || (code == HistFormat_OtelExp_Delta) - // Question: why are other formats not here as valid ? + // Question: why are other formats like HistFormat_Geometric_XOR not here as valid ? } /** @@ -150,7 +150,7 @@ object BinaryHistogram extends StrictLogging { * @return the number of bytes written, including the length prefix */ def writeDelta(buckets: HistogramBuckets, values: Array[Long], buf: MutableDirectBuffer): Int = { - require(buckets.numBuckets == values.size, s"Values array size of ${values.size} != ${buckets.numBuckets}") + require(buckets.numBuckets == values.length, s"Values array size of ${values.length} != ${buckets.numBuckets}") val formatCode = if (buckets.numBuckets == 0) HistFormat_Null else buckets match { case g: GeometricBuckets if g.minusOne => HistFormat_Geometric1_Delta case g: GeometricBuckets => HistFormat_Geometric_Delta